Skip to content

Commit

Permalink
Merge branch 'add-statsd-stats-to-tcp2udp-des-392'
Browse files Browse the repository at this point in the history
  • Loading branch information
faern committed Jan 2, 2024
2 parents 104ccaf + d0405fd commit 39f8796
Show file tree
Hide file tree
Showing 7 changed files with 219 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ Line wrap the file at 100 chars. Th


## [Unreleased]
### Changed
- Add (optional) statsd metrics reporting support to `tcp2udp` binary and library module when the
`statsd` cargo feature is enabled.


## [0.3.1] - 2023-10-25
Expand Down
29 changes: 29 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ opt-level = 3
lto = true
codegen-units = 1

[features]
# Enable this feature to make it possible to have tcp2udp report metrics over statsd
statsd = ["cadence"]

[dependencies]
tokio = { version = "1.0", features = ["rt-multi-thread", "macros", "net", "time", "io-util"] }
err-context = "0.1.0"
Expand All @@ -32,6 +36,7 @@ lazy_static = "1.4.0"
# Only used by the binaries in src/bin/ and is optional so it's not
# pulled in when built as a library.
env_logger = { version = "0.10.0", optional = true }
cadence = { version = "1.0.0", optional = true }

[target.'cfg(target_os = "linux")'.dependencies]
nix = { version = "0.27.1", features = ["socket"] }
1 change: 1 addition & 0 deletions build-static-bins.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ RUSTFLAGS="-C target-feature=+crt-static" \
--target x86_64-unknown-linux-gnu \
--features env_logger \
--features clap \
--features statsd \
--bins
146 changes: 146 additions & 0 deletions src/statsd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
#[cfg(feature = "statsd")]
pub use real::Error;

pub struct StatsdMetrics(StatsdMetricsChooser);

enum StatsdMetricsChooser {
Dummy,
#[cfg(feature = "statsd")]
Real(real::StatsdMetrics),
}

impl StatsdMetrics {
/// Creates a dummy statsd metrics instance. Does not actually connect to any statds
/// server, nor emits any events. Used as an API compatible drop in when metrics
/// should not be emitted.
pub fn dummy() -> Self {
Self(StatsdMetricsChooser::Dummy)
}

/// Creates a statsd metric reporting instance connecting to the given host addr.
#[cfg(feature = "statsd")]
pub fn real(host: std::net::SocketAddr) -> Result<Self, Error> {
let statsd = real::StatsdMetrics::new(host)?;
Ok(Self(StatsdMetricsChooser::Real(statsd)))
}

/// Emit a metric saying we failed to accept an incoming TCP connection (probably ran out of file descriptors)
pub fn accept_error(&self) {
#[cfg(feature = "statsd")]
if let StatsdMetricsChooser::Real(statsd) = &self.0 {
statsd.accept_error()
}
}

/// Increment the connection counter inside this metrics instance and emit that new gauge value
pub fn incr_connections(&self) {
#[cfg(feature = "statsd")]
if let StatsdMetricsChooser::Real(statsd) = &self.0 {
statsd.incr_connections()
}
}

/// Decrement the connection counter inside this metrics instance and emit that new gauge value
pub fn decr_connections(&self) {
#[cfg(feature = "statsd")]
if let StatsdMetricsChooser::Real(statsd) = &self.0 {
statsd.decr_connections()
}
}
}

#[cfg(feature = "statsd")]
mod real {
use cadence::{CountedExt, Gauged, QueuingMetricSink, StatsdClient, UdpMetricSink};
use std::sync::atomic::{AtomicU64, Ordering};

/// Queue with a maximum capacity of 8K events.
/// This program is extremely unlikely to ever reach that upper bound.
/// The bound is still here so that if it ever were to happen, we drop events
/// instead of indefinitely filling the memory with unsent events.
const QUEUE_SIZE: usize = 8 * 1024;

const PREFIX: &str = "tcp2udp";

#[derive(Debug)]
pub enum Error {
/// Failed to create + bind the statsd UDP socket.
BindUdpSocket(std::io::Error),
/// Failed to create statsd client.
CreateStatsdClient(cadence::MetricError),
}

impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use Error::*;
match self {
BindUdpSocket(_) => "Failed to bind the UDP socket".fmt(f),
CreateStatsdClient(e) => e.fmt(f),
}
}
}

impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
BindUdpSocket(e) => Some(e),
CreateStatsdClient(e) => e.source(),
}
}
}

pub struct StatsdMetrics {
client: StatsdClient,
num_connections: AtomicU64,
}

impl StatsdMetrics {
pub fn new(host: std::net::SocketAddr) -> Result<Self, Error> {
let socket = std::net::UdpSocket::bind("0.0.0.0:0").map_err(Error::BindUdpSocket)?;
log::debug!(
"Statsd socket bound to {}",
socket
.local_addr()
.map(|a| a.to_string())
.unwrap_or_else(|_| "Unknown".to_owned())
);

// Create a non-buffered blocking metrics sink. It is important that it's not buffered,
// so events are emitted instantly when they happen (this program does not emit a lot of
// events, nor does it attach timestamps to the events.
// The fact that it's blocking does not matter, since the `QueuingMetricSink` will make sure
// the `UdpMetricSink` runs in its own thread anyway.
let udp_sink = UdpMetricSink::from(host, socket).map_err(Error::CreateStatsdClient)?;
let queuing_sink = QueuingMetricSink::with_capacity(udp_sink, QUEUE_SIZE);
let statds_client = StatsdClient::from_sink(PREFIX, queuing_sink);
Ok(Self {
client: statds_client,
num_connections: AtomicU64::new(0),
})
}

pub fn accept_error(&self) {
log::debug!("Sending statsd tcp_accept_errors");
if let Err(e) = self.client.incr("tcp_accept_errors") {
log::error!("Failed to emit statsd tcp_accept_errors: {e}");
}
}

pub fn incr_connections(&self) {
let num_connections = self.num_connections.fetch_add(1, Ordering::Relaxed) + 1;
log::debug!("Sending statsd num_connections = {num_connections}");
if let Err(e) = self.client.gauge("num_connections", num_connections) {
log::error!("Failed to emit statsd num_connections: {e}");
}
}

pub fn decr_connections(&self) {
let num_connections = self.num_connections.fetch_sub(1, Ordering::Relaxed) - 1;
log::debug!("Sending statsd num_connections = {num_connections}");
if let Err(e) = self.client.gauge("num_connections", num_connections) {
log::error!("Failed to emit statsd num_connections: {e}");
}
}
}
}
34 changes: 34 additions & 0 deletions src/tcp2udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ use std::convert::Infallible;
use std::fmt;
use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::{TcpListener, TcpSocket, TcpStream, UdpSocket};
use tokio::time::sleep;

#[path = "statsd.rs"]
mod statsd;

#[derive(Debug)]
#[cfg_attr(feature = "clap", derive(clap::Parser))]
#[cfg_attr(feature = "clap", group(skip))]
Expand All @@ -31,6 +35,11 @@ pub struct Options {

#[cfg_attr(feature = "clap", clap(flatten))]
pub tcp_options: crate::tcp_options::TcpOptions,

#[cfg(feature = "statsd")]
/// Host to send statsd metrics to.
#[cfg_attr(feature = "clap", clap(long))]
statsd_host: Option<SocketAddr>,
}

/// Error returned from [`run`] if something goes wrong.
Expand All @@ -47,6 +56,9 @@ pub enum Tcp2UdpError {
BindTcpSocket(io::Error, SocketAddr),
/// Failed to start listening on TCP socket
ListenTcpSocket(io::Error, SocketAddr),
#[cfg(feature = "statsd")]
/// Failed to initialize statsd client
CreateStatsdClient(statsd::Error),
}

impl fmt::Display for Tcp2UdpError {
Expand All @@ -63,6 +75,8 @@ impl fmt::Display for Tcp2UdpError {
"Failed to start listening on TCP socket bound to {}",
addr
),
#[cfg(feature = "statsd")]
CreateStatsdClient(_) => "Failed to init metrics client".fmt(f),
}
}
}
Expand All @@ -77,6 +91,8 @@ impl std::error::Error for Tcp2UdpError {
SetReuseAddr(e) => Some(e),
BindTcpSocket(e, _) => Some(e),
ListenTcpSocket(e, _) => Some(e),
#[cfg(feature = "statsd")]
CreateStatsdClient(e) => Some(e),
}
}
}
Expand All @@ -98,6 +114,16 @@ pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
}
});

#[cfg(not(feature = "statsd"))]
let statsd = Arc::new(statsd::StatsdMetrics::dummy());
#[cfg(feature = "statsd")]
let statsd = Arc::new(match options.statsd_host {
None => statsd::StatsdMetrics::dummy(),
Some(statsd_host) => {
statsd::StatsdMetrics::real(statsd_host).map_err(Tcp2UdpError::CreateStatsdClient)?
}
});

let mut join_handles = Vec::with_capacity(options.tcp_listen_addrs.len());
for tcp_listen_addr in options.tcp_listen_addrs {
let tcp_listener = create_listening_socket(tcp_listen_addr, &options.tcp_options)?;
Expand All @@ -106,13 +132,15 @@ pub async fn run(options: Options) -> Result<Infallible, Tcp2UdpError> {
let udp_forward_addr = options.udp_forward_addr;
let tcp_recv_timeout = options.tcp_options.recv_timeout;
let tcp_nodelay = options.tcp_options.nodelay;
let statsd = Arc::clone(&statsd);
join_handles.push(tokio::spawn(async move {
process_tcp_listener(
tcp_listener,
udp_bind_ip,
udp_forward_addr,
tcp_recv_timeout,
tcp_nodelay,
statsd,
)
.await;
}));
Expand Down Expand Up @@ -150,6 +178,7 @@ async fn process_tcp_listener(
udp_forward_addr: SocketAddr,
tcp_recv_timeout: Option<Duration>,
tcp_nodelay: bool,
statsd: Arc<statsd::StatsdMetrics>,
) -> ! {
let mut cooldown =
ExponentialBackoff::new(Duration::from_millis(50), Duration::from_millis(5000));
Expand All @@ -160,7 +189,9 @@ async fn process_tcp_listener(
if let Err(error) = crate::tcp_options::set_nodelay(&tcp_stream, tcp_nodelay) {
log::error!("Error: {}", error.display("\nCaused by: "));
}
let statsd = statsd.clone();
tokio::spawn(async move {
statsd.incr_connections();
if let Err(error) = process_socket(
tcp_stream,
tcp_peer_addr,
Expand All @@ -172,12 +203,15 @@ async fn process_tcp_listener(
{
log::error!("Error: {}", error.display("\nCaused by: "));
}
statsd.decr_connections();
});
cooldown.reset();
}
Err(error) => {
log::error!("Error when accepting incoming TCP connection: {}", error);

statsd.accept_error();

// If the process runs out of file descriptors, it will fail to accept a socket.
// But that socket will also remain in the queue, so it will fail again immediately.
// This will busy loop consuming the CPU and filling any logs. To prevent this,
Expand Down
2 changes: 1 addition & 1 deletion tcp2udp.service
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ LimitNOFILE=16384
# Uncomment this to have the logs not contain the IPs of the peers using this service
#Environment=REDACT_LOGS=1
Environment=RUST_LOG=debug
ExecStart=/usr/local/bin/tcp2udp --threads=2 --tcp-listen 0.0.0.0:443 --udp-bind=127.0.0.1 --udp-forward 127.0.0.1:51820 --tcp-recv-timeout=130 --nodelay
ExecStart=/usr/local/bin/tcp2udp --threads=2 --statsd-host 127.0.0.1:8125 --tcp-listen 0.0.0.0:443 --udp-bind=127.0.0.1 --udp-forward 127.0.0.1:51820 --tcp-recv-timeout=130 --nodelay

Restart=always
RestartSec=2
Expand Down

0 comments on commit 39f8796

Please sign in to comment.