Skip to content

Commit

Permalink
gate metrics behind config
Browse files Browse the repository at this point in the history
  • Loading branch information
meowjesty committed Dec 20, 2024
1 parent 8032c1c commit be6b181
Show file tree
Hide file tree
Showing 31 changed files with 220 additions and 200 deletions.
30 changes: 0 additions & 30 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion mirrord/agent/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ rustls.workspace = true
envy = "0.4"
socket2.workspace = true
prometheus = { version = "0.13", features = ["process"] }
kameo = { git = "https://github.com/tqwewe/kameo", branch = "main" }
axum = { version = "0.7", features = ["macros"] }
axum-server = "0.7"

Expand Down
16 changes: 15 additions & 1 deletion mirrord/agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@ mirrord-agent is distributed as a container image (currently only x86) that is p

## Enabling prometheus metrics

TODO(alex) [mid]: Talk how to enable it from env whatever.
To start the metrics server, you'll need to add this config to your `mirrord.json`:

```json
{
"agent": {
"metrics": "0.0.0.0:9000",
"annotations": {
"prometheus.io/scrape": "true",
"prometheus.io/port": "9000"
}
}
```

Remember to change the `port` in both `metrics` and `annotations`, they have to match,
otherwise prometheus will try to scrape on `port: 80` or other commonly used ports.

### Installing prometheus

Expand Down
8 changes: 7 additions & 1 deletion mirrord/agent/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#![deny(missing_docs)]

use clap::{Parser, Subcommand};
use mirrord_protocol::{MeshVendor, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV};
use mirrord_protocol::{
MeshVendor, AGENT_METRICS_ENV, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV,
};

const DEFAULT_RUNTIME: &str = "containerd";

Expand All @@ -26,6 +28,10 @@ pub struct Args {
#[arg(short = 'i', long, env = AGENT_NETWORK_INTERFACE_ENV)]
pub network_interface: Option<String>,

/// Controls whether metrics are enabled, and the address to set up the metrics server.
#[arg(long, env = AGENT_METRICS_ENV)]
pub metrics: Option<String>,

/// Return an error after accepting the first client connection, in order to test agent error
/// cleanup.
///
Expand Down
4 changes: 2 additions & 2 deletions mirrord/agent/src/container_handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{collections::HashMap, sync::Arc};

use crate::{
error::Result,
error::AgentResult,
runtime::{Container, ContainerInfo, ContainerRuntime},
};

Expand All @@ -22,7 +22,7 @@ pub(crate) struct ContainerHandle(Arc<Inner>);
impl ContainerHandle {
/// Retrieve info about the container and initialize this struct.
#[tracing::instrument(level = "trace")]
pub(crate) async fn new(container: Container) -> Result<Self> {
pub(crate) async fn new(container: Container) -> AgentResult<Self> {
let ContainerInfo { pid, env: raw_env } = container.get_info().await?;

let inner = Inner { pid, raw_env };
Expand Down
10 changes: 5 additions & 5 deletions mirrord/agent/src/dns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use tokio_util::sync::CancellationToken;
use tracing::Level;

use crate::{
error::{AgentError, Result},
error::{AgentError, AgentResult},
watched_task::TaskStatus,
};

Expand Down Expand Up @@ -86,7 +86,7 @@ impl DnsWorker {
// Prepares the `Resolver` after reading some `/etc` DNS files.
//
// We care about logging these errors, at an `error!` level.
let resolver: Result<_, ResponseError> = try {
let resolver: AgentResult<_, ResponseError> = try {
let resolv_conf_path = etc_path.join("resolv.conf");
let hosts_path = etc_path.join("hosts");

Expand Down Expand Up @@ -139,7 +139,7 @@ impl DnsWorker {
pub(crate) async fn run(
mut self,
cancellation_token: CancellationToken,
) -> Result<(), AgentError> {
) -> AgentResult<(), AgentError> {
loop {
tokio::select! {
_ = cancellation_token.cancelled() => break Ok(()),
Expand Down Expand Up @@ -175,7 +175,7 @@ impl DnsApi {
pub(crate) async fn make_request(
&mut self,
request: GetAddrInfoRequest,
) -> Result<(), AgentError> {
) -> AgentResult<(), AgentError> {
let (response_tx, response_rx) = oneshot::channel();

let command = DnsCommand {
Expand All @@ -194,7 +194,7 @@ impl DnsApi {
/// Returns the result of the oldest outstanding DNS request issued with this struct (see
/// [`Self::make_request`]).
#[tracing::instrument(level = Level::TRACE, skip(self), ret, err)]
pub(crate) async fn recv(&mut self) -> Result<GetAddrInfoResponse, AgentError> {
pub(crate) async fn recv(&mut self) -> AgentResult<GetAddrInfoResponse, AgentError> {
let Some(response) = self.responses.next().await else {
return future::pending().await;
};
Expand Down
37 changes: 20 additions & 17 deletions mirrord/agent/src/entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{
client_connection::ClientConnection,
container_handle::ContainerHandle,
dns::DnsApi,
error::{AgentError, Result},
error::{AgentError, AgentResult},
file::FileManager,
outgoing::{TcpOutgoingApi, UdpOutgoingApi},
runtime::get_container,
Expand Down Expand Up @@ -73,7 +73,7 @@ struct State {

impl State {
/// Return [`Err`] if container runtime operations failed.
pub async fn new(args: &Args) -> Result<State> {
pub async fn new(args: &Args) -> AgentResult<State> {
let tls_connector = args
.operator_tls_cert_pem
.clone()
Expand Down Expand Up @@ -213,7 +213,7 @@ impl ClientConnectionHandler {
mut connection: ClientConnection,
bg_tasks: BackgroundTasks,
state: State,
) -> Result<Self> {
) -> AgentResult<Self> {
let pid = state.container_pid();

let file_manager = FileManager::new(pid.or_else(|| state.ephemeral.then_some(1)));
Expand Down Expand Up @@ -274,7 +274,7 @@ impl ClientConnectionHandler {
id: ClientId,
task: BackgroundTask<StealerCommand>,
connection: &mut ClientConnection,
) -> Result<Option<TcpStealerApi>> {
) -> AgentResult<Option<TcpStealerApi>> {
if let BackgroundTask::Running(stealer_status, stealer_sender) = task {
match TcpStealerApi::new(
id,
Expand Down Expand Up @@ -314,7 +314,7 @@ impl ClientConnectionHandler {
///
/// Breaks upon receiver/sender drop.
#[tracing::instrument(level = "trace", skip(self))]
async fn start(mut self, cancellation_token: CancellationToken) -> Result<()> {
async fn start(mut self, cancellation_token: CancellationToken) -> AgentResult<()> {
let error = loop {
select! {
message = self.connection.receive() => {
Expand Down Expand Up @@ -390,15 +390,15 @@ impl ClientConnectionHandler {

/// Sends a [`DaemonMessage`] response to the connected client (`mirrord-layer`).
#[tracing::instrument(level = "trace", skip(self))]
async fn respond(&mut self, response: DaemonMessage) -> Result<()> {
async fn respond(&mut self, response: DaemonMessage) -> AgentResult<()> {
self.connection.send(response).await.map_err(Into::into)
}

/// Handles incoming messages from the connected client (`mirrord-layer`).
///
/// Returns `false` if the client disconnected.
#[tracing::instrument(level = Level::TRACE, skip(self), err)]
async fn handle_client_message(&mut self, message: ClientMessage) -> Result<bool> {
async fn handle_client_message(&mut self, message: ClientMessage) -> AgentResult<bool> {
match message {
ClientMessage::FileRequest(req) => {
if let Some(response) = self.file_manager.handle_message(req).await? {
Expand Down Expand Up @@ -490,14 +490,17 @@ impl ClientConnectionHandler {

/// Initializes the agent's [`State`], channels, threads, and runs [`ClientConnectionHandler`]s.
#[tracing::instrument(level = Level::TRACE, ret, err)]
async fn start_agent(args: Args) -> Result<()> {
async fn start_agent(args: Args) -> AgentResult<()> {
trace!("start_agent -> Starting agent with args: {args:?}");

tokio::spawn(async move {
start_metrics()
.await
.inspect_err(|fail| tracing::error!(?fail, "Failed starting metrics server!"))
});
if let Some(metrics_address) = args.metrics.as_ref() {
let address = metrics_address.parse()?;
tokio::spawn(async move {
start_metrics(address)
.await
.inspect_err(|fail| tracing::error!(?fail, "Failed starting metrics server!"))
});
}

let listener = TcpListener::bind(SocketAddrV4::new(
Ipv4Addr::UNSPECIFIED,
Expand Down Expand Up @@ -730,7 +733,7 @@ async fn start_agent(args: Args) -> Result<()> {
Ok(())
}

async fn clear_iptable_chain() -> Result<()> {
async fn clear_iptable_chain() -> AgentResult<()> {
let ipt = new_iptables();

SafeIpTables::load(IPTablesWrapper::from(ipt), false)
Expand All @@ -741,7 +744,7 @@ async fn clear_iptable_chain() -> Result<()> {
Ok(())
}

async fn run_child_agent() -> Result<()> {
async fn run_child_agent() -> AgentResult<()> {
let command_args = std::env::args().collect::<Vec<_>>();
let (command, args) = command_args
.split_first()
Expand All @@ -765,7 +768,7 @@ async fn run_child_agent() -> Result<()> {
///
/// Captures SIGTERM signals sent by Kubernetes when the pod is gracefully deleted.
/// When a signal is captured, the child process is killed and the iptables are cleaned.
async fn start_iptable_guard(args: Args) -> Result<()> {
async fn start_iptable_guard(args: Args) -> AgentResult<()> {
debug!("start_iptable_guard -> Initializing iptable-guard.");

let state = State::new(&args).await?;
Expand Down Expand Up @@ -813,7 +816,7 @@ async fn start_iptable_guard(args: Args) -> Result<()> {
/// 1. If you try to `bind` a socket to some address before [`start_agent`], it'll actually
/// be bound **twice**, which incurs an error (address already in use). You could get around
/// this by `bind`ing on `0.0.0.0:0`, but this is most likely **not** what you want.
pub async fn main() -> Result<()> {
pub async fn main() -> AgentResult<()> {
rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider())
.expect("Failed to install crypto provider");

Expand Down
4 changes: 2 additions & 2 deletions mirrord/agent/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use mirrord_protocol::RemoteResult;
use tokio::io::AsyncReadExt;
use wildmatch::WildMatch;

use crate::error::Result;
use crate::error::AgentResult;

struct EnvFilter {
include: Vec<WildMatch>,
Expand Down Expand Up @@ -97,7 +97,7 @@ pub(crate) fn parse_raw_env<'a, S: AsRef<str> + 'a + ?Sized, T: IntoIterator<Ite
.collect::<HashMap<_, _>>()
}

pub(crate) async fn get_proc_environ(path: PathBuf) -> Result<HashMap<String, String>> {
pub(crate) async fn get_proc_environ(path: PathBuf) -> AgentResult<HashMap<String, String>> {
let mut environ_file = tokio::fs::File::open(path).await?;

let mut raw_env_vars = String::with_capacity(8192);
Expand Down
5 changes: 4 additions & 1 deletion mirrord/agent/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub(crate) enum AgentError {
/// Temporary error for vpn feature
#[error("Generic error in vpn: {0}")]
VpnError(String),

#[error(transparent)]
AddrParse(#[from] std::net::AddrParseError),
}

impl From<mpsc::error::SendError<StealerCommand>> for AgentError {
Expand All @@ -92,4 +95,4 @@ impl From<mpsc::error::SendError<StealerCommand>> for AgentError {
}
}

pub(crate) type Result<T, E = AgentError> = std::result::Result<T, E>;
pub(crate) type AgentResult<T, E = AgentError> = std::result::Result<T, E>;
6 changes: 3 additions & 3 deletions mirrord/agent/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use libc::DT_DIR;
use mirrord_protocol::{file::*, FileRequest, FileResponse, RemoteResult, ResponseError};
use tracing::{error, trace, Level};

use crate::{error::Result, metrics::OPEN_FD_COUNT};
use crate::{error::AgentResult, metrics::OPEN_FD_COUNT};

#[derive(Debug)]
pub enum RemoteFile {
Expand Down Expand Up @@ -139,7 +139,7 @@ impl FileManager {
pub(crate) async fn handle_message(
&mut self,
request: FileRequest,
) -> Result<Option<FileResponse>> {
) -> AgentResult<Option<FileResponse>> {
Ok(match request {
FileRequest::Open(OpenFileRequest { path, open_options }) => {
// TODO: maybe not agent error on this?
Expand Down Expand Up @@ -857,7 +857,7 @@ impl FileManager {
// buffer (and there was no error converting to a
// `DirEntryInternal`.
while let Some(entry) = entry_results
.next_if(|entry_res: &Result<DirEntryInternal, io::Error>| {
.next_if(|entry_res: &AgentResult<DirEntryInternal, io::Error>| {
entry_res.as_ref().is_ok_and(|entry| {
entry.get_d_reclen64() as u64 + result_size <= buffer_size
})
Expand Down
3 changes: 2 additions & 1 deletion mirrord/agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,12 @@ mod vpn;
#[cfg(target_os = "linux")]
mod watched_task;

#[cfg(target_os = "linux")]
mod metrics;

#[cfg(target_os = "linux")]
#[tokio::main(flavor = "current_thread")]
async fn main() -> crate::error::Result<()> {
async fn main() -> crate::error::AgentResult<()> {
crate::entrypoint::main().await
}

Expand Down
6 changes: 3 additions & 3 deletions mirrord/agent/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::LazyLock;
use std::{net::SocketAddr, sync::LazyLock};

use axum::{response::IntoResponse, routing::get, Router};
use prometheus::{register_int_gauge, IntGauge};
Expand Down Expand Up @@ -104,10 +104,10 @@ async fn get_metrics() -> Result<String, MetricsError> {
}

#[tracing::instrument(level = Level::TRACE, skip_all, ret ,err)]
pub(crate) async fn start_metrics() -> Result<(), axum::BoxError> {
pub(crate) async fn start_metrics(address: SocketAddr) -> Result<(), axum::BoxError> {
let app = Router::new().route("/metrics", get(get_metrics));

let listener = TcpListener::bind("0.0.0.0:9000")
let listener = TcpListener::bind(address)
.await
.map_err(AgentError::from)
.inspect_err(|fail| tracing::error!(?fail, "Actor listener!"))?;
Expand Down
Loading

0 comments on commit be6b181

Please sign in to comment.