From c04c4bcacca735f0bbd5e4e78a68a794f8ef0260 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Thu, 30 Nov 2023 16:02:50 +0100 Subject: [PATCH 1/4] Add initial statsd reporting --- CHANGELOG.md | 3 + Cargo.lock | 29 ++++++++++ Cargo.toml | 5 ++ src/statsd.rs | 146 +++++++++++++++++++++++++++++++++++++++++++++++++ src/tcp2udp.rs | 34 ++++++++++++ 5 files changed, 217 insertions(+) create mode 100644 src/statsd.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index cb40b2f..47537e1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,9 @@ Line wrap the file at 100 chars. Th ## [Unreleased] +### Changed +- Add (optional) statsd metrics reporting support to `tcp2udp` binary and library module when the + `statsd` cargo feature is enabled. ## [0.3.1] - 2023-10-25 diff --git a/Cargo.lock b/Cargo.lock index 85f955b..188259c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -107,6 +107,15 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "cadence" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab51a759f502097abe855100b81b421d3a104b62a2c3209f751d90ce6dd2ea1" +dependencies = [ + "crossbeam-channel", +] + [[package]] name = "cc" version = "1.0.83" @@ -168,6 +177,25 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" +dependencies = [ + "cfg-if", +] + [[package]] name = "env_logger" version = "0.10.0" @@ -577,6 +605,7 @@ dependencies = [ name = "udp-over-tcp" version = "0.3.1" dependencies = [ + "cadence", "clap", "env_logger", "err-context", diff --git a/Cargo.toml b/Cargo.toml index 3b63a22..6f7af5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,10 @@ opt-level = 3 lto = true codegen-units = 1 +[features] +# Enable this feature to make it possible to have tcp2udp report metrics over statsd +statsd = ["cadence"] + [dependencies] tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "net", "time", "io-util"] } err-context = "0.1.0" @@ -32,6 +36,7 @@ lazy_static = "1.4.0" # Only used by the binaries in src/bin/ and is optional so it's not # pulled in when built as a library. env_logger = { version = "0.10.0", optional = true } +cadence = { version = "1.0.0", optional = true } [target.'cfg(target_os = "linux")'.dependencies] nix = { version = "0.27.1", features = ["socket"] } diff --git a/src/statsd.rs b/src/statsd.rs new file mode 100644 index 0000000..b3a8b97 --- /dev/null +++ b/src/statsd.rs @@ -0,0 +1,146 @@ +#[cfg(feature = "statsd")] +pub use real::Error; + +pub struct StatsdMetrics(StatsdMetricsChooser); + +enum StatsdMetricsChooser { + Dummy, + #[cfg(feature = "statsd")] + Real(real::StatsdMetrics), +} + +impl StatsdMetrics { + /// Creates a dummy statsd metrics instance. Does not actually connect to any statds + /// server, nor emits any events. Used as an API compatible drop in when metrics + /// should not be emitted. + pub fn dummy() -> Self { + Self(StatsdMetricsChooser::Dummy) + } + + /// Creates a statsd metric reporting instance connecting to the given host addr. + #[cfg(feature = "statsd")] + pub fn real(host: std::net::SocketAddr) -> Result { + let statsd = real::StatsdMetrics::new(host)?; + Ok(Self(StatsdMetricsChooser::Real(statsd))) + } + + /// Emit a metric saying we failed to accept an incoming TCP connection (probably ran out of file descriptors) + pub fn accept_error(&self) { + #[cfg(feature = "statsd")] + if let StatsdMetricsChooser::Real(statsd) = &self.0 { + statsd.accept_error() + } + } + + /// Increment the connection counter inside this metrics instance and emit that new gauge value + pub fn incr_connections(&self) { + #[cfg(feature = "statsd")] + if let StatsdMetricsChooser::Real(statsd) = &self.0 { + statsd.incr_connections() + } + } + + /// Decrement the connection counter inside this metrics instance and emit that new gauge value + pub fn decr_connections(&self) { + #[cfg(feature = "statsd")] + if let StatsdMetricsChooser::Real(statsd) = &self.0 { + statsd.decr_connections() + } + } +} + +#[cfg(feature = "statsd")] +mod real { + use cadence::{CountedExt, Gauged, QueuingMetricSink, StatsdClient, UdpMetricSink}; + use std::sync::atomic::{AtomicU64, Ordering}; + + /// Queue with a maximum capacity of 8K events. + /// This program is extremely unlikely to ever reach that upper bound. + /// The bound is still here so that if it ever were to happen, we drop events + /// instead of indefinitely filling the memory with unsent events. + const QUEUE_SIZE: usize = 8 * 1024; + + const PREFIX: &str = "tcp2udp"; + + #[derive(Debug)] + pub enum Error { + /// Failed to create + bind the statsd UDP socket. + BindUdpSocket(std::io::Error), + /// Failed to create statsd client. + CreateStatsdClient(cadence::MetricError), + } + + impl std::fmt::Display for Error { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Error::*; + match self { + BindUdpSocket(_) => "Failed to bind the UDP socket".fmt(f), + CreateStatsdClient(e) => e.fmt(f), + } + } + } + + impl std::error::Error for Error { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + use Error::*; + match self { + BindUdpSocket(e) => Some(e), + CreateStatsdClient(e) => e.source(), + } + } + } + + pub struct StatsdMetrics { + client: StatsdClient, + num_connections: AtomicU64, + } + + impl StatsdMetrics { + pub fn new(host: std::net::SocketAddr) -> Result { + let socket = std::net::UdpSocket::bind("0.0.0.0:0").map_err(Error::BindUdpSocket)?; + log::debug!( + "Statsd socket bound to {}", + socket + .local_addr() + .map(|a| a.to_string()) + .unwrap_or_else(|_| "Unknown".to_owned()) + ); + + // Create a non-buffered blocking metrics sink. It is important that it's not buffered, + // so events are emitted instantly when they happen (this program does not emit a lot of + // events, nor does it attach timestamps to the events. + // The fact that it's blocking does not matter, since the `QueuingMetricSink` will make sure + // the `UdpMetricSink` runs in its own thread anyway. + let udp_sink = UdpMetricSink::from(host, socket).map_err(Error::CreateStatsdClient)?; + let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, QUEUE_SIZE); + let statds_client = StatsdClient::from_sink(PREFIX, queuing_sink); + Ok(Self { + client: statds_client, + num_connections: AtomicU64::new(0), + }) + } + + pub fn accept_error(&self) { + log::debug!("Sending statsd tcp_accept_errors"); + if let Err(e) = self.client.incr("tcp_accept_errors") { + log::error!("Failed to emit statsd tcp_accept_errors: {e}"); + } + } + + pub fn incr_connections(&self) { + let num_connections = self.num_connections.fetch_add(1, Ordering::SeqCst) + 1; + log::debug!("Sending statsd num_connections = {num_connections}"); + if let Err(e) = self.client.gauge("num_connections", num_connections) { + log::error!("Failed to emit statsd num_connections: {e}"); + } + } + + pub fn decr_connections(&self) { + let num_connections = self.num_connections.fetch_sub(1, Ordering::SeqCst) - 1; + log::debug!("Sending statsd num_connections = {num_connections}"); + if let Err(e) = self.client.gauge("num_connections", num_connections) { + log::error!("Failed to emit statsd num_connections: {e}"); + } + } + } +} diff --git a/src/tcp2udp.rs b/src/tcp2udp.rs index 49dac39..0ebe2ae 100644 --- a/src/tcp2udp.rs +++ b/src/tcp2udp.rs @@ -8,10 +8,14 @@ use std::convert::Infallible; use std::fmt; use std::io; use std::net::{IpAddr, SocketAddr}; +use std::sync::Arc; use std::time::Duration; use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket}; use tokio::time::sleep; +#[path = "statsd.rs"] +mod statsd; + #[derive(Debug)] #[cfg_attr(feature = "clap", derive(clap::Parser))] #[cfg_attr(feature = "clap", group(skip))] @@ -31,6 +35,11 @@ pub struct Options { #[cfg_attr(feature = "clap", clap(flatten))] pub tcp_options: crate::tcp_options::TcpOptions, + + #[cfg(feature = "statsd")] + /// Host to send statsd metrics to. + #[cfg_attr(feature = "clap", clap(long))] + statsd_host: Option, } /// Error returned from [`run`] if something goes wrong. @@ -47,6 +56,9 @@ pub enum Tcp2UdpError { BindTcpSocket(io::Error, SocketAddr), /// Failed to start listening on TCP socket ListenTcpSocket(io::Error, SocketAddr), + #[cfg(feature = "statsd")] + /// Failed to initialize statsd client + CreateStatsdClient(statsd::Error), } impl fmt::Display for Tcp2UdpError { @@ -63,6 +75,8 @@ impl fmt::Display for Tcp2UdpError { "Failed to start listening on TCP socket bound to {}", addr ), + #[cfg(feature = "statsd")] + CreateStatsdClient(_) => "Failed to init metrics client".fmt(f), } } } @@ -77,6 +91,8 @@ impl std::error::Error for Tcp2UdpError { SetReuseAddr(e) => Some(e), BindTcpSocket(e, _) => Some(e), ListenTcpSocket(e, _) => Some(e), + #[cfg(feature = "statsd")] + CreateStatsdClient(e) => Some(e), } } } @@ -98,6 +114,16 @@ pub async fn run(options: Options) -> Result { } }); + #[cfg(not(feature = "statsd"))] + let statsd = Arc::new(statsd::StatsdMetrics::dummy()); + #[cfg(feature = "statsd")] + let statsd = Arc::new(match options.statsd_host { + None => statsd::StatsdMetrics::dummy(), + Some(statsd_host) => { + statsd::StatsdMetrics::real(statsd_host).map_err(Tcp2UdpError::CreateStatsdClient)? + } + }); + let mut join_handles = Vec::with_capacity(options.tcp_listen_addrs.len()); for tcp_listen_addr in options.tcp_listen_addrs { let tcp_listener = create_listening_socket(tcp_listen_addr, &options.tcp_options)?; @@ -106,6 +132,7 @@ pub async fn run(options: Options) -> Result { let udp_forward_addr = options.udp_forward_addr; let tcp_recv_timeout = options.tcp_options.recv_timeout; let tcp_nodelay = options.tcp_options.nodelay; + let statsd = Arc::clone(&statsd); join_handles.push(tokio::spawn(async move { process_tcp_listener( tcp_listener, @@ -113,6 +140,7 @@ pub async fn run(options: Options) -> Result { udp_forward_addr, tcp_recv_timeout, tcp_nodelay, + statsd, ) .await; })); @@ -150,6 +178,7 @@ async fn process_tcp_listener( udp_forward_addr: SocketAddr, tcp_recv_timeout: Option, tcp_nodelay: bool, + statsd: Arc, ) -> ! { let mut cooldown = ExponentialBackoff::new(Duration::from_millis(50), Duration::from_millis(5000)); @@ -160,7 +189,9 @@ async fn process_tcp_listener( if let Err(error) = crate::tcp_options::set_nodelay(&tcp_stream, tcp_nodelay) { log::error!("Error: {}", error.display("\nCaused by: ")); } + let statsd = statsd.clone(); tokio::spawn(async move { + statsd.incr_connections(); if let Err(error) = process_socket( tcp_stream, tcp_peer_addr, @@ -172,12 +203,15 @@ async fn process_tcp_listener( { log::error!("Error: {}", error.display("\nCaused by: ")); } + statsd.decr_connections(); }); cooldown.reset(); } Err(error) => { log::error!("Error when accepting incoming TCP connection: {}", error); + statsd.accept_error(); + // If the process runs out of file descriptors, it will fail to accept a socket. // But that socket will also remain in the queue, so it will fail again immediately. // This will busy loop consuming the CPU and filling any logs. To prevent this, From 3bb6a2b2df6c3d43ba2000e22c69f5696a98fbc5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Tue, 19 Dec 2023 13:47:47 +0100 Subject: [PATCH 2/4] Relax atomic ordering on connection counter Ordering not required. No other data depends on this counter --- src/statsd.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/statsd.rs b/src/statsd.rs index b3a8b97..c3a9655 100644 --- a/src/statsd.rs +++ b/src/statsd.rs @@ -128,7 +128,7 @@ mod real { } pub fn incr_connections(&self) { - let num_connections = self.num_connections.fetch_add(1, Ordering::SeqCst) + 1; + let num_connections = self.num_connections.fetch_add(1, Ordering::Relaxed) + 1; log::debug!("Sending statsd num_connections = {num_connections}"); if let Err(e) = self.client.gauge("num_connections", num_connections) { log::error!("Failed to emit statsd num_connections: {e}"); @@ -136,7 +136,7 @@ mod real { } pub fn decr_connections(&self) { - let num_connections = self.num_connections.fetch_sub(1, Ordering::SeqCst) - 1; + let num_connections = self.num_connections.fetch_sub(1, Ordering::Relaxed) - 1; log::debug!("Sending statsd num_connections = {num_connections}"); if let Err(e) = self.client.gauge("num_connections", num_connections) { log::error!("Failed to emit statsd num_connections: {e}"); From 29ac80fe5a0c5e2c7aadf4758aae0fc02cc71e5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Wed, 13 Dec 2023 15:11:08 +0100 Subject: [PATCH 3/4] Enable statsd in build-static-bins.sh --- build-static-bins.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/build-static-bins.sh b/build-static-bins.sh index fb1d888..8b15654 100755 --- a/build-static-bins.sh +++ b/build-static-bins.sh @@ -12,4 +12,5 @@ RUSTFLAGS="-C target-feature=+crt-static" \ --target x86_64-unknown-linux-gnu \ --features env_logger \ --features clap \ + --features statsd \ --bins From d0405fd1d83ca2b802017725cc1ad0206a439d5c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Wed, 13 Dec 2023 15:31:54 +0100 Subject: [PATCH 4/4] Add statsd argument to example tcp2udp systemd service file --- tcp2udp.service | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tcp2udp.service b/tcp2udp.service index 279c242..4915bc6 100644 --- a/tcp2udp.service +++ b/tcp2udp.service @@ -20,7 +20,7 @@ LimitNOFILE=16384 # Uncomment this to have the logs not contain the IPs of the peers using this service #Environment=REDACT_LOGS=1 Environment=RUST_LOG=debug -ExecStart=/usr/local/bin/tcp2udp --threads=2 --tcp-listen 0.0.0.0:443 --udp-bind=127.0.0.1 --udp-forward 127.0.0.1:51820 --tcp-recv-timeout=130 --nodelay +ExecStart=/usr/local/bin/tcp2udp --threads=2 --statsd-host 127.0.0.1:8125 --tcp-listen 0.0.0.0:443 --udp-bind=127.0.0.1 --udp-forward 127.0.0.1:51820 --tcp-recv-timeout=130 --nodelay Restart=always RestartSec=2