Skip to content

Commit

Permalink
udpoutgoing metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
meowjesty committed Dec 19, 2024
1 parent 1ebc509 commit df0a007
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 8 deletions.
2 changes: 1 addition & 1 deletion mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ impl ClientConnectionHandler {
let dns_api = Self::create_dns_api(bg_tasks.dns);

let tcp_outgoing_api = TcpOutgoingApi::new(pid, state.metrics.clone());
let udp_outgoing_api = UdpOutgoingApi::new(pid);
let udp_outgoing_api = UdpOutgoingApi::new(pid, state.metrics.clone());

let client_handler = Self {
id,
Expand Down
42 changes: 42 additions & 0 deletions mirrord/agent/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ async fn get_metrics(
steal_unfiltered_port_subscription_count,
steal_connection_subscription_count,
tcp_outgoing_connection_count,
udp_outgoing_connection_count,
} = metrics.ask(MetricsGetAll).await?;

prometheus_metrics.open_fd_count.set(open_fd_count as i64);
Expand Down Expand Up @@ -78,6 +79,10 @@ async fn get_metrics(
.tcp_outgoing_connection_count
.set(tcp_outgoing_connection_count as i64);

prometheus_metrics
.udp_outgoing_connection_count
.set(udp_outgoing_connection_count as i64);

let metric_families = prometheus::gather();

let mut buffer = Vec::new();
Expand All @@ -97,6 +102,7 @@ struct PrometheusMetrics {
steal_unfiltered_port_subscription_count: GenericGauge<AtomicI64>,
steal_connection_subscription_count: GenericGauge<AtomicI64>,
tcp_outgoing_connection_count: GenericGauge<AtomicI64>,
udp_outgoing_connection_count: GenericGauge<AtomicI64>,
}

impl PrometheusMetrics {
Expand Down Expand Up @@ -132,6 +138,10 @@ impl PrometheusMetrics {
"mirrord_agent_tcp_outgoing_connection_count",
"amount of tcp outgoing connections in mirrord-agent"
)?,
udp_outgoing_connection_count: register_int_gauge!(
"mirrord_agent_udp_outgoing_connection_count",
"amount of udp outgoing connections in mirrord-agent"
)?,
})
}
}
Expand All @@ -146,6 +156,7 @@ pub(crate) struct MetricsActor {
steal_unfiltered_port_subscription_count: u64,
steal_connection_subscription_count: u64,
tcp_outgoing_connection_count: u64,
udp_outgoing_connection_count: u64,
}

impl MetricsActor {
Expand Down Expand Up @@ -213,6 +224,9 @@ pub(crate) struct MetricsDecStealConnectionSubscription;
pub(crate) struct MetricsIncTcpOutgoingConnection;
pub(crate) struct MetricsDecTcpOutgoingConnection;

pub(crate) struct MetricsIncUdpOutgoingConnection;
pub(crate) struct MetricsDecUdpOutgoingConnection;

pub(crate) struct MetricsGetAll;

#[derive(Reply, Serialize)]
Expand All @@ -224,6 +238,7 @@ pub(crate) struct MetricsGetAllReply {
steal_unfiltered_port_subscription_count: u64,
steal_connection_subscription_count: u64,
tcp_outgoing_connection_count: u64,
udp_outgoing_connection_count: u64,
}

impl Message<MetricsIncFd> for MetricsActor {
Expand Down Expand Up @@ -421,6 +436,32 @@ impl Message<MetricsDecTcpOutgoingConnection> for MetricsActor {
}
}

impl Message<MetricsIncUdpOutgoingConnection> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncUdpOutgoingConnection,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.udp_outgoing_connection_count += 1;
}
}

impl Message<MetricsDecUdpOutgoingConnection> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecUdpOutgoingConnection,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.udp_outgoing_connection_count = self.udp_outgoing_connection_count.saturating_sub(1);
}
}

impl Message<MetricsGetAll> for MetricsActor {
type Reply = MetricsGetAllReply;

Expand All @@ -438,6 +479,7 @@ impl Message<MetricsGetAll> for MetricsActor {
steal_unfiltered_port_subscription_count: self.steal_unfiltered_port_subscription_count,
steal_connection_subscription_count: self.steal_connection_subscription_count,
tcp_outgoing_connection_count: self.tcp_outgoing_connection_count,
udp_outgoing_connection_count: self.udp_outgoing_connection_count,
}
}
}
42 changes: 36 additions & 6 deletions mirrord/agent/src/outgoing/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use futures::{
prelude::*,
stream::{SplitSink, SplitStream},
};
use kameo::actor::ActorRef;
use mirrord_protocol::{
outgoing::{udp::*, *},
ConnectionId, ResponseError,
Expand All @@ -22,8 +23,10 @@ use tokio::{
use tokio_util::{codec::BytesCodec, udp::UdpFramed};
use tracing::{debug, trace, warn};

use super::MetricsActor;
use crate::{
error::Result,
metrics::{MetricsDecUdpOutgoingConnection, MetricsIncUdpOutgoingConnection},
util::run_thread_in_namespace,
watched_task::{TaskStatus, WatchedTask},
};
Expand Down Expand Up @@ -71,12 +74,14 @@ async fn connect(remote_address: SocketAddr) -> Result<UdpSocket, ResponseError>
impl UdpOutgoingApi {
const TASK_NAME: &'static str = "UdpOutgoing";

pub(crate) fn new(pid: Option<u64>) -> Self {
pub(crate) fn new(pid: Option<u64>, metrics: ActorRef<MetricsActor>) -> Self {
let (layer_tx, layer_rx) = mpsc::channel(1000);
let (daemon_tx, daemon_rx) = mpsc::channel(1000);

let watched_task =
WatchedTask::new(Self::TASK_NAME, Self::interceptor_task(layer_rx, daemon_tx));
let watched_task = WatchedTask::new(
Self::TASK_NAME,
Self::interceptor_task(layer_rx, daemon_tx, metrics),
);

let task_status = watched_task.status();
let task = run_thread_in_namespace(
Expand All @@ -101,6 +106,7 @@ impl UdpOutgoingApi {
async fn interceptor_task(
mut layer_rx: Receiver<Layer>,
daemon_tx: Sender<Daemon>,
metrics: ActorRef<MetricsActor>,
) -> Result<()> {
let mut connection_ids = 0..=ConnectionId::MAX;

Expand Down Expand Up @@ -136,11 +142,14 @@ impl UdpOutgoingApi {
.ok_or_else(|| ResponseError::IdsExhausted("connect".into()))?;

debug!("interceptor_task -> mirror_socket {:#?}", mirror_socket);

let peer_address = mirror_socket.peer_addr()?;
let local_address = mirror_socket.local_addr()?;
let local_address = SocketAddress::Ip(local_address);

let framed = UdpFramed::new(mirror_socket, BytesCodec::new());
debug!("interceptor_task -> framed {:#?}", framed);

let (sink, stream): (
SplitSink<UdpFramed<BytesCodec>, (BytesMut, SocketAddr)>,
SplitStream<UdpFramed<BytesCodec>>,
Expand All @@ -158,7 +167,13 @@ impl UdpOutgoingApi {

let daemon_message = DaemonUdpOutgoing::Connect(daemon_connect);
debug!("interceptor_task -> daemon_message {:#?}", daemon_message);
daemon_tx.send(daemon_message).await?

daemon_tx.send(daemon_message).await?;

let _ = metrics
.tell(MetricsIncUdpOutgoingConnection)
.await
.inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!"));
}
// [user] -> [layer] -> [agent] -> [remote]
// `user` wrote some message to the remote host.
Expand All @@ -183,14 +198,24 @@ impl UdpOutgoingApi {
readers.remove(&connection_id);

let daemon_message = DaemonUdpOutgoing::Close(connection_id);
daemon_tx.send(daemon_message).await?
daemon_tx.send(daemon_message).await?;

let _ = metrics
.tell(MetricsDecUdpOutgoingConnection)
.await
.inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!"));
}
}
// [layer] -> [agent]
// `layer` closed their interceptor stream.
LayerUdpOutgoing::Close(LayerClose { ref connection_id }) => {
writers.remove(connection_id);
readers.remove(connection_id);

let _ = metrics
.tell(MetricsDecUdpOutgoingConnection)
.await
.inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!"));
}
}
}
Expand All @@ -216,7 +241,12 @@ impl UdpOutgoingApi {
readers.remove(&connection_id);

let daemon_message = DaemonUdpOutgoing::Close(connection_id);
daemon_tx.send(daemon_message).await?
daemon_tx.send(daemon_message).await?;

let _ = metrics
.tell(MetricsDecUdpOutgoingConnection)
.await
.inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!"));
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion mirrord/protocol/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,16 @@ pub enum ClientMessage {
/// These are the messages used by the `steal` feature, and handled by the `TcpStealerApi` in
/// the agent.
TcpSteal(LayerTcpSteal),
// TODO(alex) [high]: Outgoing is next!
/// TCP outgoing message.
///
/// These are the messages used by the `outgoing` feature (tcp), and handled by the
/// `TcpOutgoingApi` in the agent.
TcpOutgoing(LayerTcpOutgoing),

/// UDP outgoing message.
///
/// These are the messages used by the `outgoing` feature (udp), and handled by the
/// `UdpOutgoingApi` in the agent.
UdpOutgoing(LayerUdpOutgoing),
FileRequest(FileRequest),
GetEnvVarsRequest(GetEnvVarsRequest),
Expand Down
36 changes: 36 additions & 0 deletions mirrord/protocol/src/outgoing/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,50 @@ use crate::RemoteResult;

#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)]
pub enum LayerUdpOutgoing {
/// User is interested in connecting via udp to some remote address, specified in
/// [`LayerConnect`].
///
/// The layer will get a mirrord managed address that it'll `connect` to, meanwhile
/// in the agent we `connect` to the actual remote address.
///
/// Saying that we have an _udp connection_ is a bit weird, considering it's a
/// _connectionless_ protocol, but in mirrord we use a _fakeish_ connection mechanism
/// when dealing with outgoing udp traffic.
Connect(LayerConnect),

/// Write data to the remote address the agent is `connect`ed to.
///
/// There's no `Read` message, as we're calling `read` in the agent, and we send
/// a [`DaemonUdpOutgoing::Read`] message in case we get some data from this connection.
Write(LayerWrite),

/// The layer closed the connection, this message syncs up the agent, closing it
/// over there as well.
///
/// Connections in the agent may be closed in other ways, such as when an error happens
/// when reading or writing. Which means that this message is not the only way of
/// closing outgoing udp connections.
Close(LayerClose),
}

#[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)]
pub enum DaemonUdpOutgoing {
/// The agent attempted a connection to the remote address specified by
/// [`LayerUdpOutgoing::Connect`], and it might've been successful or not.
///
/// See the docs for [`LayerUdpOutgoing::Connect`] for a bit more information on the
/// weird idea of `connect` and udp in mirrord.
Connect(RemoteResult<DaemonConnect>),

/// Read data from the connection.
///
/// There's no `Write` message, as `write`s come from the user (layer). The agent sending
/// a `write` to the layer like this would make no sense, since it could just `write` it
/// to the remote connection itself.
Read(RemoteResult<DaemonRead>),

/// Tell the layer that this connection has been `close`d, either by a request from
/// the user with [`LayerUdpOutgoing::Close`], or from some error in the agent when
/// writing or reading from the connection.
Close(ConnectionId),
}

0 comments on commit df0a007

Please sign in to comment.