Skip to content

Commit

Permalink
move metrics to modules
Browse files Browse the repository at this point in the history
  • Loading branch information
meowjesty committed Dec 19, 2024
1 parent 0c91441 commit c17482c
Show file tree
Hide file tree
Showing 10 changed files with 289 additions and 274 deletions.
11 changes: 0 additions & 11 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,17 +859,6 @@ pub async fn main() -> Result<()> {

let args = cli::parse_args();

// TODO(alex) [high]: Could start metrics from here, as the agent itself has 2
// different starting points. So start task here, and pass comms to both.
//
// CANNOT `bind` anything before `start_agent`, we might hit addrinuse.
// let metrics = kameo::spawn(MetricsActor::default());
// let listener = TcpListener::bind("0.0.0.0:0")
// .await
// .map_err(AgentError::from)
// .inspect_err(|fail| tracing::error!(?fail, "Generic listener!"))
// .inspect(|s| tracing::info!(?s, "Listening"))?;

let agent_result = if args.mode.is_targetless()
|| (std::env::var(IPTABLE_PREROUTING_ENV).is_ok()
&& std::env::var(IPTABLE_MESH_ENV).is_ok()
Expand Down
5 changes: 4 additions & 1 deletion mirrord/agent/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use tracing::{error, trace, Level};

use crate::{
error::Result,
metrics::{MetricsActor, MetricsDecFd, MetricsIncFd},
metrics::{
file_ops::{MetricsDecFd, MetricsIncFd},
MetricsActor,
},
};

#[derive(Debug)]
Expand Down
262 changes: 7 additions & 255 deletions mirrord/agent/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ use tracing::Level;

use crate::error::AgentError;

pub(crate) mod file_ops;
pub(crate) mod incoming_traffic;
pub(crate) mod outgoing_traffic;

#[derive(Error, Debug)]
pub(crate) enum MetricsError {
#[error(transparent)]
Expand All @@ -34,8 +38,7 @@ impl IntoResponse for MetricsError {
}
}

#[tracing::instrument(level = Level::INFO, skip(prometheus_metrics), ret, err)]
#[axum::debug_handler]
#[tracing::instrument(level = Level::TRACE, skip(prometheus_metrics), ret, err)]
async fn get_metrics(
metrics: Extension<ActorRef<MetricsActor>>,
prometheus_metrics: Extension<PrometheusMetrics>,
Expand Down Expand Up @@ -171,7 +174,7 @@ impl MetricsActor {
impl Actor for MetricsActor {
type Mailbox = UnboundedMailbox<Self>;

#[tracing::instrument(level = Level::INFO, skip_all, ret ,err)]
#[tracing::instrument(level = Level::TRACE, skip_all, ret ,err)]
async fn on_start(&mut self, metrics: ActorRef<Self>) -> Result<(), BoxError> {
if self.enabled {
let prometheus_metrics = PrometheusMetrics::new()?;
Expand All @@ -198,35 +201,6 @@ impl Actor for MetricsActor {
}
}

pub(crate) struct MetricsIncFd;
pub(crate) struct MetricsDecFd;

pub(crate) struct MetricsIncMirrorPortSubscription;
pub(crate) struct MetricsDecMirrorPortSubscription;

pub(crate) struct MetricsIncMirrorConnectionSubscription;
pub(crate) struct MetricsDecMirrorConnectionSubscription;

pub(crate) struct MetricsIncStealPortSubscription {
pub(crate) filtered: bool,
}
pub(crate) struct MetricsDecStealPortSubscription {
pub(crate) filtered: bool,
}

pub(crate) struct MetricsDecStealPortSubscriptionMany {
pub(crate) removed_subscriptions: Vec<bool>,
}

pub(crate) struct MetricsIncStealConnectionSubscription;
pub(crate) struct MetricsDecStealConnectionSubscription;

pub(crate) struct MetricsIncTcpOutgoingConnection;
pub(crate) struct MetricsDecTcpOutgoingConnection;

pub(crate) struct MetricsIncUdpOutgoingConnection;
pub(crate) struct MetricsDecUdpOutgoingConnection;

pub(crate) struct MetricsGetAll;

#[derive(Reply, Serialize)]
Expand All @@ -240,232 +214,10 @@ pub(crate) struct MetricsGetAllReply {
tcp_outgoing_connection_count: u64,
udp_outgoing_connection_count: u64,
}

impl Message<MetricsIncFd> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncFd,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.open_fd_count += 1;
}
}

impl Message<MetricsDecFd> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecFd,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.open_fd_count = self.open_fd_count.saturating_sub(1);
}
}

impl Message<MetricsIncMirrorPortSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncMirrorPortSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.mirror_port_subscription_count += 1;
}
}

impl Message<MetricsDecMirrorPortSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecMirrorPortSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.mirror_port_subscription_count = self.mirror_port_subscription_count.saturating_sub(1);
}
}

impl Message<MetricsIncStealPortSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
MetricsIncStealPortSubscription { filtered }: MetricsIncStealPortSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
if filtered {
self.steal_filtered_port_subscription_count += 1;
} else {
self.steal_unfiltered_port_subscription_count += 1;
}
}
}

impl Message<MetricsDecStealPortSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
MetricsDecStealPortSubscription { filtered }: MetricsDecStealPortSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
if filtered {
self.steal_filtered_port_subscription_count = self
.steal_filtered_port_subscription_count
.saturating_sub(1);
} else {
self.steal_unfiltered_port_subscription_count = self
.steal_unfiltered_port_subscription_count
.saturating_sub(1);
}
}
}

impl Message<MetricsDecStealPortSubscriptionMany> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
MetricsDecStealPortSubscriptionMany {
removed_subscriptions,
}: MetricsDecStealPortSubscriptionMany,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
for filtered in removed_subscriptions {
if filtered {
self.steal_filtered_port_subscription_count = self
.steal_filtered_port_subscription_count
.saturating_sub(1);
} else {
self.steal_unfiltered_port_subscription_count = self
.steal_unfiltered_port_subscription_count
.saturating_sub(1);
}
}
}
}

impl Message<MetricsIncStealConnectionSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncStealConnectionSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.steal_connection_subscription_count += 1;
}
}

impl Message<MetricsDecStealConnectionSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecStealConnectionSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.steal_connection_subscription_count =
self.steal_connection_subscription_count.saturating_sub(1);
}
}

impl Message<MetricsIncMirrorConnectionSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncMirrorConnectionSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.mirror_connection_subscription_count += 1;
}
}

impl Message<MetricsDecMirrorConnectionSubscription> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecMirrorConnectionSubscription,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.mirror_connection_subscription_count =
self.mirror_connection_subscription_count.saturating_sub(1);
}
}

impl Message<MetricsIncTcpOutgoingConnection> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncTcpOutgoingConnection,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.tcp_outgoing_connection_count += 1;
}
}

impl Message<MetricsDecTcpOutgoingConnection> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecTcpOutgoingConnection,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.tcp_outgoing_connection_count = self.tcp_outgoing_connection_count.saturating_sub(1);
}
}

impl Message<MetricsIncUdpOutgoingConnection> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncUdpOutgoingConnection,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.udp_outgoing_connection_count += 1;
}
}

impl Message<MetricsDecUdpOutgoingConnection> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecUdpOutgoingConnection,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.udp_outgoing_connection_count = self.udp_outgoing_connection_count.saturating_sub(1);
}
}

impl Message<MetricsGetAll> for MetricsActor {
type Reply = MetricsGetAllReply;

#[tracing::instrument(level = Level::INFO, skip_all)]
#[tracing::instrument(level = Level::TRACE, skip_all)]
async fn handle(
&mut self,
_: MetricsGetAll,
Expand Down
33 changes: 33 additions & 0 deletions mirrord/agent/src/metrics/file_ops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
use kameo::message::{Context, Message};
use tracing::Level;

use crate::metrics::MetricsActor;

pub(crate) struct MetricsIncFd;
pub(crate) struct MetricsDecFd;

impl Message<MetricsIncFd> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsIncFd,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.open_fd_count += 1;
}
}

impl Message<MetricsDecFd> for MetricsActor {
type Reply = ();

#[tracing::instrument(level = Level::INFO, skip_all)]
async fn handle(
&mut self,
_: MetricsDecFd,
_ctx: Context<'_, Self, Self::Reply>,
) -> Self::Reply {
self.open_fd_count = self.open_fd_count.saturating_sub(1);
}
}
Loading

0 comments on commit c17482c

Please sign in to comment.