diff --git a/mirrord/agent/src/entrypoint.rs b/mirrord/agent/src/entrypoint.rs index 218ad922c29..8bbdaa0bd62 100644 --- a/mirrord/agent/src/entrypoint.rs +++ b/mirrord/agent/src/entrypoint.rs @@ -859,17 +859,6 @@ pub async fn main() -> Result<()> { let args = cli::parse_args(); - // TODO(alex) [high]: Could start metrics from here, as the agent itself has 2 - // different starting points. So start task here, and pass comms to both. - // - // CANNOT `bind` anything before `start_agent`, we might hit addrinuse. - // let metrics = kameo::spawn(MetricsActor::default()); - // let listener = TcpListener::bind("0.0.0.0:0") - // .await - // .map_err(AgentError::from) - // .inspect_err(|fail| tracing::error!(?fail, "Generic listener!")) - // .inspect(|s| tracing::info!(?s, "Listening"))?; - let agent_result = if args.mode.is_targetless() || (std::env::var(IPTABLE_PREROUTING_ENV).is_ok() && std::env::var(IPTABLE_MESH_ENV).is_ok() diff --git a/mirrord/agent/src/file.rs b/mirrord/agent/src/file.rs index ab6b0407d75..0808cc1d382 100644 --- a/mirrord/agent/src/file.rs +++ b/mirrord/agent/src/file.rs @@ -17,7 +17,10 @@ use tracing::{error, trace, Level}; use crate::{ error::Result, - metrics::{MetricsActor, MetricsDecFd, MetricsIncFd}, + metrics::{ + file_ops::{MetricsDecFd, MetricsIncFd}, + MetricsActor, + }, }; #[derive(Debug)] diff --git a/mirrord/agent/src/metrics.rs b/mirrord/agent/src/metrics.rs index a00736ef7ee..babba1aa245 100644 --- a/mirrord/agent/src/metrics.rs +++ b/mirrord/agent/src/metrics.rs @@ -14,6 +14,10 @@ use tracing::Level; use crate::error::AgentError; +pub(crate) mod file_ops; +pub(crate) mod incoming_traffic; +pub(crate) mod outgoing_traffic; + #[derive(Error, Debug)] pub(crate) enum MetricsError { #[error(transparent)] @@ -34,8 +38,7 @@ impl IntoResponse for MetricsError { } } -#[tracing::instrument(level = Level::INFO, skip(prometheus_metrics), ret, err)] -#[axum::debug_handler] +#[tracing::instrument(level = Level::TRACE, skip(prometheus_metrics), ret, err)] async fn get_metrics( metrics: Extension>, prometheus_metrics: Extension, @@ -171,7 +174,7 @@ impl MetricsActor { impl Actor for MetricsActor { type Mailbox = UnboundedMailbox; - #[tracing::instrument(level = Level::INFO, skip_all, ret ,err)] + #[tracing::instrument(level = Level::TRACE, skip_all, ret ,err)] async fn on_start(&mut self, metrics: ActorRef) -> Result<(), BoxError> { if self.enabled { let prometheus_metrics = PrometheusMetrics::new()?; @@ -198,35 +201,6 @@ impl Actor for MetricsActor { } } -pub(crate) struct MetricsIncFd; -pub(crate) struct MetricsDecFd; - -pub(crate) struct MetricsIncMirrorPortSubscription; -pub(crate) struct MetricsDecMirrorPortSubscription; - -pub(crate) struct MetricsIncMirrorConnectionSubscription; -pub(crate) struct MetricsDecMirrorConnectionSubscription; - -pub(crate) struct MetricsIncStealPortSubscription { - pub(crate) filtered: bool, -} -pub(crate) struct MetricsDecStealPortSubscription { - pub(crate) filtered: bool, -} - -pub(crate) struct MetricsDecStealPortSubscriptionMany { - pub(crate) removed_subscriptions: Vec, -} - -pub(crate) struct MetricsIncStealConnectionSubscription; -pub(crate) struct MetricsDecStealConnectionSubscription; - -pub(crate) struct MetricsIncTcpOutgoingConnection; -pub(crate) struct MetricsDecTcpOutgoingConnection; - -pub(crate) struct MetricsIncUdpOutgoingConnection; -pub(crate) struct MetricsDecUdpOutgoingConnection; - pub(crate) struct MetricsGetAll; #[derive(Reply, Serialize)] @@ -240,232 +214,10 @@ pub(crate) struct MetricsGetAllReply { tcp_outgoing_connection_count: u64, udp_outgoing_connection_count: u64, } - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsIncFd, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.open_fd_count += 1; - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsDecFd, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.open_fd_count = self.open_fd_count.saturating_sub(1); - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsIncMirrorPortSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.mirror_port_subscription_count += 1; - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsDecMirrorPortSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.mirror_port_subscription_count = self.mirror_port_subscription_count.saturating_sub(1); - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - MetricsIncStealPortSubscription { filtered }: MetricsIncStealPortSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - if filtered { - self.steal_filtered_port_subscription_count += 1; - } else { - self.steal_unfiltered_port_subscription_count += 1; - } - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - MetricsDecStealPortSubscription { filtered }: MetricsDecStealPortSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - if filtered { - self.steal_filtered_port_subscription_count = self - .steal_filtered_port_subscription_count - .saturating_sub(1); - } else { - self.steal_unfiltered_port_subscription_count = self - .steal_unfiltered_port_subscription_count - .saturating_sub(1); - } - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - MetricsDecStealPortSubscriptionMany { - removed_subscriptions, - }: MetricsDecStealPortSubscriptionMany, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - for filtered in removed_subscriptions { - if filtered { - self.steal_filtered_port_subscription_count = self - .steal_filtered_port_subscription_count - .saturating_sub(1); - } else { - self.steal_unfiltered_port_subscription_count = self - .steal_unfiltered_port_subscription_count - .saturating_sub(1); - } - } - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsIncStealConnectionSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.steal_connection_subscription_count += 1; - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsDecStealConnectionSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.steal_connection_subscription_count = - self.steal_connection_subscription_count.saturating_sub(1); - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsIncMirrorConnectionSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.mirror_connection_subscription_count += 1; - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsDecMirrorConnectionSubscription, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.mirror_connection_subscription_count = - self.mirror_connection_subscription_count.saturating_sub(1); - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsIncTcpOutgoingConnection, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.tcp_outgoing_connection_count += 1; - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsDecTcpOutgoingConnection, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.tcp_outgoing_connection_count = self.tcp_outgoing_connection_count.saturating_sub(1); - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsIncUdpOutgoingConnection, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.udp_outgoing_connection_count += 1; - } -} - -impl Message for MetricsActor { - type Reply = (); - - #[tracing::instrument(level = Level::INFO, skip_all)] - async fn handle( - &mut self, - _: MetricsDecUdpOutgoingConnection, - _ctx: Context<'_, Self, Self::Reply>, - ) -> Self::Reply { - self.udp_outgoing_connection_count = self.udp_outgoing_connection_count.saturating_sub(1); - } -} - impl Message for MetricsActor { type Reply = MetricsGetAllReply; - #[tracing::instrument(level = Level::INFO, skip_all)] + #[tracing::instrument(level = Level::TRACE, skip_all)] async fn handle( &mut self, _: MetricsGetAll, diff --git a/mirrord/agent/src/metrics/file_ops.rs b/mirrord/agent/src/metrics/file_ops.rs new file mode 100644 index 00000000000..ed040442604 --- /dev/null +++ b/mirrord/agent/src/metrics/file_ops.rs @@ -0,0 +1,33 @@ +use kameo::message::{Context, Message}; +use tracing::Level; + +use crate::metrics::MetricsActor; + +pub(crate) struct MetricsIncFd; +pub(crate) struct MetricsDecFd; + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncFd, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.open_fd_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecFd, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.open_fd_count = self.open_fd_count.saturating_sub(1); + } +} diff --git a/mirrord/agent/src/metrics/incoming_traffic.rs b/mirrord/agent/src/metrics/incoming_traffic.rs new file mode 100644 index 00000000000..e8de28d249a --- /dev/null +++ b/mirrord/agent/src/metrics/incoming_traffic.rs @@ -0,0 +1,167 @@ +use kameo::message::{Context, Message}; +use tracing::Level; + +use crate::metrics::MetricsActor; + +pub(crate) struct MetricsIncMirrorPortSubscription; +pub(crate) struct MetricsDecMirrorPortSubscription; + +pub(crate) struct MetricsIncMirrorConnectionSubscription; +pub(crate) struct MetricsDecMirrorConnectionSubscription; + +pub(crate) struct MetricsIncStealPortSubscription { + pub(crate) filtered: bool, +} +pub(crate) struct MetricsDecStealPortSubscription { + pub(crate) filtered: bool, +} + +pub(crate) struct MetricsDecStealPortSubscriptionMany { + pub(crate) removed_subscriptions: Vec, +} + +pub(crate) struct MetricsIncStealConnectionSubscription; +pub(crate) struct MetricsDecStealConnectionSubscription; + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncMirrorPortSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.mirror_port_subscription_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecMirrorPortSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.mirror_port_subscription_count = self.mirror_port_subscription_count.saturating_sub(1); + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + MetricsIncStealPortSubscription { filtered }: MetricsIncStealPortSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + if filtered { + self.steal_filtered_port_subscription_count += 1; + } else { + self.steal_unfiltered_port_subscription_count += 1; + } + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + MetricsDecStealPortSubscription { filtered }: MetricsDecStealPortSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + if filtered { + self.steal_filtered_port_subscription_count = self + .steal_filtered_port_subscription_count + .saturating_sub(1); + } else { + self.steal_unfiltered_port_subscription_count = self + .steal_unfiltered_port_subscription_count + .saturating_sub(1); + } + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + MetricsDecStealPortSubscriptionMany { + removed_subscriptions, + }: MetricsDecStealPortSubscriptionMany, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + for filtered in removed_subscriptions { + if filtered { + self.steal_filtered_port_subscription_count = self + .steal_filtered_port_subscription_count + .saturating_sub(1); + } else { + self.steal_unfiltered_port_subscription_count = self + .steal_unfiltered_port_subscription_count + .saturating_sub(1); + } + } + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncStealConnectionSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.steal_connection_subscription_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecStealConnectionSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.steal_connection_subscription_count = + self.steal_connection_subscription_count.saturating_sub(1); + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncMirrorConnectionSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.mirror_connection_subscription_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecMirrorConnectionSubscription, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.mirror_connection_subscription_count = + self.mirror_connection_subscription_count.saturating_sub(1); + } +} diff --git a/mirrord/agent/src/metrics/outgoing_traffic.rs b/mirrord/agent/src/metrics/outgoing_traffic.rs new file mode 100644 index 00000000000..d9d21a591ca --- /dev/null +++ b/mirrord/agent/src/metrics/outgoing_traffic.rs @@ -0,0 +1,62 @@ +use kameo::message::{Context, Message}; +use tracing::Level; + +use crate::metrics::MetricsActor; + +pub(crate) struct MetricsIncTcpOutgoingConnection; +pub(crate) struct MetricsDecTcpOutgoingConnection; + +pub(crate) struct MetricsIncUdpOutgoingConnection; +pub(crate) struct MetricsDecUdpOutgoingConnection; + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncTcpOutgoingConnection, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.tcp_outgoing_connection_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecTcpOutgoingConnection, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.tcp_outgoing_connection_count = self.tcp_outgoing_connection_count.saturating_sub(1); + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsIncUdpOutgoingConnection, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.udp_outgoing_connection_count += 1; + } +} + +impl Message for MetricsActor { + type Reply = (); + + #[tracing::instrument(level = Level::INFO, skip_all)] + async fn handle( + &mut self, + _: MetricsDecUdpOutgoingConnection, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.udp_outgoing_connection_count = self.udp_outgoing_connection_count.saturating_sub(1); + } +} diff --git a/mirrord/agent/src/outgoing.rs b/mirrord/agent/src/outgoing.rs index 7e171fd1126..e9f5e071182 100644 --- a/mirrord/agent/src/outgoing.rs +++ b/mirrord/agent/src/outgoing.rs @@ -20,7 +20,10 @@ use tracing::Level; use crate::{ error::Result, - metrics::{MetricsActor, MetricsDecTcpOutgoingConnection, MetricsIncTcpOutgoingConnection}, + metrics::{ + outgoing_traffic::{MetricsDecTcpOutgoingConnection, MetricsIncTcpOutgoingConnection}, + MetricsActor, + }, util::run_thread_in_namespace, watched_task::{TaskStatus, WatchedTask}, }; diff --git a/mirrord/agent/src/outgoing/udp.rs b/mirrord/agent/src/outgoing/udp.rs index f191bc63911..d60d98e895e 100644 --- a/mirrord/agent/src/outgoing/udp.rs +++ b/mirrord/agent/src/outgoing/udp.rs @@ -26,7 +26,7 @@ use tracing::{debug, trace, warn}; use super::MetricsActor; use crate::{ error::Result, - metrics::{MetricsDecUdpOutgoingConnection, MetricsIncUdpOutgoingConnection}, + metrics::outgoing_traffic::{MetricsDecUdpOutgoingConnection, MetricsIncUdpOutgoingConnection}, util::run_thread_in_namespace, watched_task::{TaskStatus, WatchedTask}, }; diff --git a/mirrord/agent/src/sniffer/api.rs b/mirrord/agent/src/sniffer/api.rs index 5cd00639a7a..1ea3bd32ea9 100644 --- a/mirrord/agent/src/sniffer/api.rs +++ b/mirrord/agent/src/sniffer/api.rs @@ -19,8 +19,11 @@ use super::messages::{SniffedConnection, SnifferCommand, SnifferCommandInner}; use crate::{ error::AgentError, metrics::{ - MetricsActor, MetricsDecMirrorConnectionSubscription, MetricsDecMirrorPortSubscription, - MetricsIncMirrorPortSubscription, + incoming_traffic::{ + MetricsDecMirrorConnectionSubscription, MetricsDecMirrorPortSubscription, + MetricsIncMirrorPortSubscription, + }, + MetricsActor, }, util::ClientId, watched_task::TaskStatus, diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index ea695c5bcfc..a75baaeaa04 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -35,9 +35,12 @@ use tracing::{trace, warn, Level}; use crate::{ error::{AgentError, Result}, metrics::{ - MetricsActor, MetricsDecStealConnectionSubscription, MetricsDecStealPortSubscription, - MetricsDecStealPortSubscriptionMany, MetricsIncStealConnectionSubscription, - MetricsIncStealPortSubscription, + incoming_traffic::{ + MetricsDecStealConnectionSubscription, MetricsDecStealPortSubscription, + MetricsDecStealPortSubscriptionMany, MetricsIncStealConnectionSubscription, + MetricsIncStealPortSubscription, + }, + MetricsActor, }, steal::{ connections::{