From df0a00727d5b65e34d0d591536abc1228b832a98 Mon Sep 17 00:00:00 2001 From: meowjesty Date: Thu, 19 Dec 2024 17:45:47 -0300 Subject: [PATCH] udpoutgoing metrics --- mirrord/agent/src/entrypoint.rs | 2 +- mirrord/agent/src/metrics.rs | 42 ++++++++++++++++++++++++++++ mirrord/agent/src/outgoing/udp.rs | 42 ++++++++++++++++++++++++---- mirrord/protocol/src/codec.rs | 6 +++- mirrord/protocol/src/outgoing/udp.rs | 36 ++++++++++++++++++++++++ 5 files changed, 120 insertions(+), 8 deletions(-) diff --git a/mirrord/agent/src/entrypoint.rs b/mirrord/agent/src/entrypoint.rs index 52355b55abb..0b8b5d8573e 100644 --- a/mirrord/agent/src/entrypoint.rs +++ b/mirrord/agent/src/entrypoint.rs @@ -233,7 +233,7 @@ impl ClientConnectionHandler { let dns_api = Self::create_dns_api(bg_tasks.dns); let tcp_outgoing_api = TcpOutgoingApi::new(pid, state.metrics.clone()); - let udp_outgoing_api = UdpOutgoingApi::new(pid); + let udp_outgoing_api = UdpOutgoingApi::new(pid, state.metrics.clone()); let client_handler = Self { id, diff --git a/mirrord/agent/src/metrics.rs b/mirrord/agent/src/metrics.rs index abc579dcc9f..a00736ef7ee 100644 --- a/mirrord/agent/src/metrics.rs +++ b/mirrord/agent/src/metrics.rs @@ -50,6 +50,7 @@ async fn get_metrics( steal_unfiltered_port_subscription_count, steal_connection_subscription_count, tcp_outgoing_connection_count, + udp_outgoing_connection_count, } = metrics.ask(MetricsGetAll).await?; prometheus_metrics.open_fd_count.set(open_fd_count as i64); @@ -78,6 +79,10 @@ async fn get_metrics( .tcp_outgoing_connection_count .set(tcp_outgoing_connection_count as i64); + prometheus_metrics + .udp_outgoing_connection_count + .set(udp_outgoing_connection_count as i64); + let metric_families = prometheus::gather(); let mut buffer = Vec::new(); @@ -97,6 +102,7 @@ struct PrometheusMetrics { steal_unfiltered_port_subscription_count: GenericGauge, steal_connection_subscription_count: GenericGauge, tcp_outgoing_connection_count: GenericGauge, + udp_outgoing_connection_count: GenericGauge, } impl PrometheusMetrics { @@ -132,6 +138,10 @@ impl PrometheusMetrics { "mirrord_agent_tcp_outgoing_connection_count", "amount of tcp outgoing connections in mirrord-agent" )?, + udp_outgoing_connection_count: register_int_gauge!( + "mirrord_agent_udp_outgoing_connection_count", + "amount of udp outgoing connections in mirrord-agent" + )?, }) } } @@ -146,6 +156,7 @@ pub(crate) struct MetricsActor { steal_unfiltered_port_subscription_count: u64, steal_connection_subscription_count: u64, tcp_outgoing_connection_count: u64, + udp_outgoing_connection_count: u64, } impl MetricsActor { @@ -213,6 +224,9 @@ pub(crate) struct MetricsDecStealConnectionSubscription; pub(crate) struct MetricsIncTcpOutgoingConnection; pub(crate) struct MetricsDecTcpOutgoingConnection; +pub(crate) struct MetricsIncUdpOutgoingConnection; +pub(crate) struct MetricsDecUdpOutgoingConnection; + pub(crate) struct MetricsGetAll; #[derive(Reply, Serialize)] @@ -224,6 +238,7 @@ pub(crate) struct MetricsGetAllReply { steal_unfiltered_port_subscription_count: u64, steal_connection_subscription_count: u64, tcp_outgoing_connection_count: u64, + udp_outgoing_connection_count: u64, } impl Message for MetricsActor { @@ -421,6 +436,32 @@ impl Message for MetricsActor { } } +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncUdpOutgoingConnection, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.udp_outgoing_connection_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecUdpOutgoingConnection, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.udp_outgoing_connection_count = self.udp_outgoing_connection_count.saturating_sub(1); + } +} + impl Message for MetricsActor { type Reply = MetricsGetAllReply; @@ -438,6 +479,7 @@ impl Message for MetricsActor { steal_unfiltered_port_subscription_count: self.steal_unfiltered_port_subscription_count, steal_connection_subscription_count: self.steal_connection_subscription_count, tcp_outgoing_connection_count: self.tcp_outgoing_connection_count, + udp_outgoing_connection_count: self.udp_outgoing_connection_count, } } } diff --git a/mirrord/agent/src/outgoing/udp.rs b/mirrord/agent/src/outgoing/udp.rs index b6baa5e537e..f191bc63911 100644 --- a/mirrord/agent/src/outgoing/udp.rs +++ b/mirrord/agent/src/outgoing/udp.rs @@ -9,6 +9,7 @@ use futures::{ prelude::*, stream::{SplitSink, SplitStream}, }; +use kameo::actor::ActorRef; use mirrord_protocol::{ outgoing::{udp::*, *}, ConnectionId, ResponseError, @@ -22,8 +23,10 @@ use tokio::{ use tokio_util::{codec::BytesCodec, udp::UdpFramed}; use tracing::{debug, trace, warn}; +use super::MetricsActor; use crate::{ error::Result, + metrics::{MetricsDecUdpOutgoingConnection, MetricsIncUdpOutgoingConnection}, util::run_thread_in_namespace, watched_task::{TaskStatus, WatchedTask}, }; @@ -71,12 +74,14 @@ async fn connect(remote_address: SocketAddr) -> Result impl UdpOutgoingApi { const TASK_NAME: &'static str = "UdpOutgoing"; - pub(crate) fn new(pid: Option) -> Self { + pub(crate) fn new(pid: Option, metrics: ActorRef) -> Self { let (layer_tx, layer_rx) = mpsc::channel(1000); let (daemon_tx, daemon_rx) = mpsc::channel(1000); - let watched_task = - WatchedTask::new(Self::TASK_NAME, Self::interceptor_task(layer_rx, daemon_tx)); + let watched_task = WatchedTask::new( + Self::TASK_NAME, + Self::interceptor_task(layer_rx, daemon_tx, metrics), + ); let task_status = watched_task.status(); let task = run_thread_in_namespace( @@ -101,6 +106,7 @@ impl UdpOutgoingApi { async fn interceptor_task( mut layer_rx: Receiver, daemon_tx: Sender, + metrics: ActorRef, ) -> Result<()> { let mut connection_ids = 0..=ConnectionId::MAX; @@ -136,11 +142,14 @@ impl UdpOutgoingApi { .ok_or_else(|| ResponseError::IdsExhausted("connect".into()))?; debug!("interceptor_task -> mirror_socket {:#?}", mirror_socket); + let peer_address = mirror_socket.peer_addr()?; let local_address = mirror_socket.local_addr()?; let local_address = SocketAddress::Ip(local_address); + let framed = UdpFramed::new(mirror_socket, BytesCodec::new()); debug!("interceptor_task -> framed {:#?}", framed); + let (sink, stream): ( SplitSink, (BytesMut, SocketAddr)>, SplitStream>, @@ -158,7 +167,13 @@ impl UdpOutgoingApi { let daemon_message = DaemonUdpOutgoing::Connect(daemon_connect); debug!("interceptor_task -> daemon_message {:#?}", daemon_message); - daemon_tx.send(daemon_message).await? + + daemon_tx.send(daemon_message).await?; + + let _ = metrics + .tell(MetricsIncUdpOutgoingConnection) + .await + .inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!")); } // [user] -> [layer] -> [agent] -> [remote] // `user` wrote some message to the remote host. @@ -183,7 +198,12 @@ impl UdpOutgoingApi { readers.remove(&connection_id); let daemon_message = DaemonUdpOutgoing::Close(connection_id); - daemon_tx.send(daemon_message).await? + daemon_tx.send(daemon_message).await?; + + let _ = metrics + .tell(MetricsDecUdpOutgoingConnection) + .await + .inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!")); } } // [layer] -> [agent] @@ -191,6 +211,11 @@ impl UdpOutgoingApi { LayerUdpOutgoing::Close(LayerClose { ref connection_id }) => { writers.remove(connection_id); readers.remove(connection_id); + + let _ = metrics + .tell(MetricsDecUdpOutgoingConnection) + .await + .inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!")); } } } @@ -216,7 +241,12 @@ impl UdpOutgoingApi { readers.remove(&connection_id); let daemon_message = DaemonUdpOutgoing::Close(connection_id); - daemon_tx.send(daemon_message).await? + daemon_tx.send(daemon_message).await?; + + let _ = metrics + .tell(MetricsDecUdpOutgoingConnection) + .await + .inspect_err(|fail| tracing::warn!(%fail, "agent metrics failure!")); } } } diff --git a/mirrord/protocol/src/codec.rs b/mirrord/protocol/src/codec.rs index 23b90230d35..e2ed6532e3b 100644 --- a/mirrord/protocol/src/codec.rs +++ b/mirrord/protocol/src/codec.rs @@ -104,12 +104,16 @@ pub enum ClientMessage { /// These are the messages used by the `steal` feature, and handled by the `TcpStealerApi` in /// the agent. TcpSteal(LayerTcpSteal), - // TODO(alex) [high]: Outgoing is next! /// TCP outgoing message. /// /// These are the messages used by the `outgoing` feature (tcp), and handled by the /// `TcpOutgoingApi` in the agent. TcpOutgoing(LayerTcpOutgoing), + + /// UDP outgoing message. + /// + /// These are the messages used by the `outgoing` feature (udp), and handled by the + /// `UdpOutgoingApi` in the agent. UdpOutgoing(LayerUdpOutgoing), FileRequest(FileRequest), GetEnvVarsRequest(GetEnvVarsRequest), diff --git a/mirrord/protocol/src/outgoing/udp.rs b/mirrord/protocol/src/outgoing/udp.rs index 02b4d97f830..f58378beeea 100644 --- a/mirrord/protocol/src/outgoing/udp.rs +++ b/mirrord/protocol/src/outgoing/udp.rs @@ -3,14 +3,50 @@ use crate::RemoteResult; #[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] pub enum LayerUdpOutgoing { + /// User is interested in connecting via udp to some remote address, specified in + /// [`LayerConnect`]. + /// + /// The layer will get a mirrord managed address that it'll `connect` to, meanwhile + /// in the agent we `connect` to the actual remote address. + /// + /// Saying that we have an _udp connection_ is a bit weird, considering it's a + /// _connectionless_ protocol, but in mirrord we use a _fakeish_ connection mechanism + /// when dealing with outgoing udp traffic. Connect(LayerConnect), + + /// Write data to the remote address the agent is `connect`ed to. + /// + /// There's no `Read` message, as we're calling `read` in the agent, and we send + /// a [`DaemonUdpOutgoing::Read`] message in case we get some data from this connection. Write(LayerWrite), + + /// The layer closed the connection, this message syncs up the agent, closing it + /// over there as well. + /// + /// Connections in the agent may be closed in other ways, such as when an error happens + /// when reading or writing. Which means that this message is not the only way of + /// closing outgoing udp connections. Close(LayerClose), } #[derive(Encode, Decode, Debug, PartialEq, Eq, Clone)] pub enum DaemonUdpOutgoing { + /// The agent attempted a connection to the remote address specified by + /// [`LayerUdpOutgoing::Connect`], and it might've been successful or not. + /// + /// See the docs for [`LayerUdpOutgoing::Connect`] for a bit more information on the + /// weird idea of `connect` and udp in mirrord. Connect(RemoteResult), + + /// Read data from the connection. + /// + /// There's no `Write` message, as `write`s come from the user (layer). The agent sending + /// a `write` to the layer like this would make no sense, since it could just `write` it + /// to the remote connection itself. Read(RemoteResult), + + /// Tell the layer that this connection has been `close`d, either by a request from + /// the user with [`LayerUdpOutgoing::Close`], or from some error in the agent when + /// writing or reading from the connection. Close(ConnectionId), }