diff --git a/Cargo.lock b/Cargo.lock index c0ec2159964..3ae46051709 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3734,34 +3734,6 @@ dependencies = [ "serde_json", ] -[[package]] -name = "kameo" -version = "0.13.0" -source = "git+https://github.com/tqwewe/kameo?branch=main#fcd9987669d7530ec5853be8f05932b2d78c901d" -dependencies = [ - "dyn-clone", - "futures", - "itertools 0.13.0", - "kameo_macros", - "once_cell", - "serde", - "tokio", - "tokio-stream", - "tracing", -] - -[[package]] -name = "kameo_macros" -version = "0.13.0" -source = "git+https://github.com/tqwewe/kameo?branch=main#fcd9987669d7530ec5853be8f05932b2d78c901d" -dependencies = [ - "heck 0.5.0", - "proc-macro2", - "quote", - "syn 2.0.90", - "uuid", -] - [[package]] name = "konst" version = "0.3.15" @@ -4262,7 +4234,6 @@ dependencies = [ "hyper-util", "iptables", "k8s-cri", - "kameo", "libc", "mirrord-protocol", "mockall", @@ -7254,7 +7225,6 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", - "tracing", "windows-sys 0.52.0", ] diff --git a/mirrord/agent/Cargo.toml b/mirrord/agent/Cargo.toml index 2acca3d0ac9..e4757fccafe 100644 --- a/mirrord/agent/Cargo.toml +++ b/mirrord/agent/Cargo.toml @@ -71,7 +71,6 @@ rustls.workspace = true envy = "0.4" socket2.workspace = true prometheus = { version = "0.13", features = ["process"] } -kameo = { git = "https://github.com/tqwewe/kameo", branch = "main" } axum = { version = "0.7", features = ["macros"] } axum-server = "0.7" diff --git a/mirrord/agent/README.md b/mirrord/agent/README.md index 8b5fa759232..5cd0f1542e7 100644 --- a/mirrord/agent/README.md +++ b/mirrord/agent/README.md @@ -9,7 +9,21 @@ mirrord-agent is distributed as a container image (currently only x86) that is p ## Enabling prometheus metrics -TODO(alex) [mid]: Talk how to enable it from env whatever. +To start the metrics server, you'll need to add this config to your `mirrord.json`: + +```json +{ + "agent": { + "metrics": "0.0.0.0:9000", + "annotations": { + "prometheus.io/scrape": "true", + "prometheus.io/port": "9000" + } +} +``` + +Remember to change the `port` in both `metrics` and `annotations`, they have to match, +otherwise prometheus will try to scrape on `port: 80` or other commonly used ports. ### Installing prometheus diff --git a/mirrord/agent/src/cli.rs b/mirrord/agent/src/cli.rs index 6c5b11e65a2..a6b3feba535 100644 --- a/mirrord/agent/src/cli.rs +++ b/mirrord/agent/src/cli.rs @@ -1,7 +1,9 @@ #![deny(missing_docs)] use clap::{Parser, Subcommand}; -use mirrord_protocol::{MeshVendor, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV}; +use mirrord_protocol::{ + MeshVendor, AGENT_METRICS_ENV, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV, +}; const DEFAULT_RUNTIME: &str = "containerd"; @@ -26,6 +28,10 @@ pub struct Args { #[arg(short = 'i', long, env = AGENT_NETWORK_INTERFACE_ENV)] pub network_interface: Option, + /// Controls whether metrics are enabled, and the address to set up the metrics server. + #[arg(long, env = AGENT_METRICS_ENV)] + pub metrics: Option, + /// Return an error after accepting the first client connection, in order to test agent error /// cleanup. /// diff --git a/mirrord/agent/src/container_handle.rs b/mirrord/agent/src/container_handle.rs index 6e8ba78173d..dd6755e766d 100644 --- a/mirrord/agent/src/container_handle.rs +++ b/mirrord/agent/src/container_handle.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Arc}; use crate::{ - error::Result, + error::AgentResult, runtime::{Container, ContainerInfo, ContainerRuntime}, }; @@ -22,7 +22,7 @@ pub(crate) struct ContainerHandle(Arc); impl ContainerHandle { /// Retrieve info about the container and initialize this struct. #[tracing::instrument(level = "trace")] - pub(crate) async fn new(container: Container) -> Result { + pub(crate) async fn new(container: Container) -> AgentResult { let ContainerInfo { pid, env: raw_env } = container.get_info().await?; let inner = Inner { pid, raw_env }; diff --git a/mirrord/agent/src/dns.rs b/mirrord/agent/src/dns.rs index 0ad44c76934..4a374eab07e 100644 --- a/mirrord/agent/src/dns.rs +++ b/mirrord/agent/src/dns.rs @@ -17,7 +17,7 @@ use tokio_util::sync::CancellationToken; use tracing::Level; use crate::{ - error::{AgentError, Result}, + error::{AgentError, AgentResult}, watched_task::TaskStatus, }; @@ -86,7 +86,7 @@ impl DnsWorker { // Prepares the `Resolver` after reading some `/etc` DNS files. // // We care about logging these errors, at an `error!` level. - let resolver: Result<_, ResponseError> = try { + let resolver: AgentResult<_, ResponseError> = try { let resolv_conf_path = etc_path.join("resolv.conf"); let hosts_path = etc_path.join("hosts"); @@ -139,7 +139,7 @@ impl DnsWorker { pub(crate) async fn run( mut self, cancellation_token: CancellationToken, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { loop { tokio::select! { _ = cancellation_token.cancelled() => break Ok(()), @@ -175,7 +175,7 @@ impl DnsApi { pub(crate) async fn make_request( &mut self, request: GetAddrInfoRequest, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { let (response_tx, response_rx) = oneshot::channel(); let command = DnsCommand { @@ -194,7 +194,7 @@ impl DnsApi { /// Returns the result of the oldest outstanding DNS request issued with this struct (see /// [`Self::make_request`]). #[tracing::instrument(level = Level::TRACE, skip(self), ret, err)] - pub(crate) async fn recv(&mut self) -> Result { + pub(crate) async fn recv(&mut self) -> AgentResult { let Some(response) = self.responses.next().await else { return future::pending().await; }; diff --git a/mirrord/agent/src/entrypoint.rs b/mirrord/agent/src/entrypoint.rs index 1f3fabec542..c72409e5f4e 100644 --- a/mirrord/agent/src/entrypoint.rs +++ b/mirrord/agent/src/entrypoint.rs @@ -33,7 +33,7 @@ use crate::{ client_connection::ClientConnection, container_handle::ContainerHandle, dns::DnsApi, - error::{AgentError, Result}, + error::{AgentError, AgentResult}, file::FileManager, outgoing::{TcpOutgoingApi, UdpOutgoingApi}, runtime::get_container, @@ -73,7 +73,7 @@ struct State { impl State { /// Return [`Err`] if container runtime operations failed. - pub async fn new(args: &Args) -> Result { + pub async fn new(args: &Args) -> AgentResult { let tls_connector = args .operator_tls_cert_pem .clone() @@ -213,7 +213,7 @@ impl ClientConnectionHandler { mut connection: ClientConnection, bg_tasks: BackgroundTasks, state: State, - ) -> Result { + ) -> AgentResult { let pid = state.container_pid(); let file_manager = FileManager::new(pid.or_else(|| state.ephemeral.then_some(1))); @@ -274,7 +274,7 @@ impl ClientConnectionHandler { id: ClientId, task: BackgroundTask, connection: &mut ClientConnection, - ) -> Result> { + ) -> AgentResult> { if let BackgroundTask::Running(stealer_status, stealer_sender) = task { match TcpStealerApi::new( id, @@ -314,7 +314,7 @@ impl ClientConnectionHandler { /// /// Breaks upon receiver/sender drop. #[tracing::instrument(level = "trace", skip(self))] - async fn start(mut self, cancellation_token: CancellationToken) -> Result<()> { + async fn start(mut self, cancellation_token: CancellationToken) -> AgentResult<()> { let error = loop { select! { message = self.connection.receive() => { @@ -390,7 +390,7 @@ impl ClientConnectionHandler { /// Sends a [`DaemonMessage`] response to the connected client (`mirrord-layer`). #[tracing::instrument(level = "trace", skip(self))] - async fn respond(&mut self, response: DaemonMessage) -> Result<()> { + async fn respond(&mut self, response: DaemonMessage) -> AgentResult<()> { self.connection.send(response).await.map_err(Into::into) } @@ -398,7 +398,7 @@ impl ClientConnectionHandler { /// /// Returns `false` if the client disconnected. #[tracing::instrument(level = Level::TRACE, skip(self), err)] - async fn handle_client_message(&mut self, message: ClientMessage) -> Result { + async fn handle_client_message(&mut self, message: ClientMessage) -> AgentResult { match message { ClientMessage::FileRequest(req) => { if let Some(response) = self.file_manager.handle_message(req).await? { @@ -490,14 +490,17 @@ impl ClientConnectionHandler { /// Initializes the agent's [`State`], channels, threads, and runs [`ClientConnectionHandler`]s. #[tracing::instrument(level = Level::TRACE, ret, err)] -async fn start_agent(args: Args) -> Result<()> { +async fn start_agent(args: Args) -> AgentResult<()> { trace!("start_agent -> Starting agent with args: {args:?}"); - tokio::spawn(async move { - start_metrics() - .await - .inspect_err(|fail| tracing::error!(?fail, "Failed starting metrics server!")) - }); + if let Some(metrics_address) = args.metrics.as_ref() { + let address = metrics_address.parse()?; + tokio::spawn(async move { + start_metrics(address) + .await + .inspect_err(|fail| tracing::error!(?fail, "Failed starting metrics server!")) + }); + } let listener = TcpListener::bind(SocketAddrV4::new( Ipv4Addr::UNSPECIFIED, @@ -730,7 +733,7 @@ async fn start_agent(args: Args) -> Result<()> { Ok(()) } -async fn clear_iptable_chain() -> Result<()> { +async fn clear_iptable_chain() -> AgentResult<()> { let ipt = new_iptables(); SafeIpTables::load(IPTablesWrapper::from(ipt), false) @@ -741,7 +744,7 @@ async fn clear_iptable_chain() -> Result<()> { Ok(()) } -async fn run_child_agent() -> Result<()> { +async fn run_child_agent() -> AgentResult<()> { let command_args = std::env::args().collect::>(); let (command, args) = command_args .split_first() @@ -765,7 +768,7 @@ async fn run_child_agent() -> Result<()> { /// /// Captures SIGTERM signals sent by Kubernetes when the pod is gracefully deleted. /// When a signal is captured, the child process is killed and the iptables are cleaned. -async fn start_iptable_guard(args: Args) -> Result<()> { +async fn start_iptable_guard(args: Args) -> AgentResult<()> { debug!("start_iptable_guard -> Initializing iptable-guard."); let state = State::new(&args).await?; @@ -813,7 +816,7 @@ async fn start_iptable_guard(args: Args) -> Result<()> { /// 1. If you try to `bind` a socket to some address before [`start_agent`], it'll actually /// be bound **twice**, which incurs an error (address already in use). You could get around /// this by `bind`ing on `0.0.0.0:0`, but this is most likely **not** what you want. -pub async fn main() -> Result<()> { +pub async fn main() -> AgentResult<()> { rustls::crypto::CryptoProvider::install_default(rustls::crypto::aws_lc_rs::default_provider()) .expect("Failed to install crypto provider"); diff --git a/mirrord/agent/src/env.rs b/mirrord/agent/src/env.rs index 26fa4681431..5a349709f2d 100644 --- a/mirrord/agent/src/env.rs +++ b/mirrord/agent/src/env.rs @@ -7,7 +7,7 @@ use mirrord_protocol::RemoteResult; use tokio::io::AsyncReadExt; use wildmatch::WildMatch; -use crate::error::Result; +use crate::error::AgentResult; struct EnvFilter { include: Vec, @@ -97,7 +97,7 @@ pub(crate) fn parse_raw_env<'a, S: AsRef + 'a + ?Sized, T: IntoIterator>() } -pub(crate) async fn get_proc_environ(path: PathBuf) -> Result> { +pub(crate) async fn get_proc_environ(path: PathBuf) -> AgentResult> { let mut environ_file = tokio::fs::File::open(path).await?; let mut raw_env_vars = String::with_capacity(8192); diff --git a/mirrord/agent/src/error.rs b/mirrord/agent/src/error.rs index ad04e49c8c5..c0c05fcfdf4 100644 --- a/mirrord/agent/src/error.rs +++ b/mirrord/agent/src/error.rs @@ -84,6 +84,9 @@ pub(crate) enum AgentError { /// Temporary error for vpn feature #[error("Generic error in vpn: {0}")] VpnError(String), + + #[error(transparent)] + AddrParse(#[from] std::net::AddrParseError), } impl From> for AgentError { @@ -92,4 +95,4 @@ impl From> for AgentError { } } -pub(crate) type Result = std::result::Result; +pub(crate) type AgentResult = std::result::Result; diff --git a/mirrord/agent/src/file.rs b/mirrord/agent/src/file.rs index 7f4c84fdee0..4ad1aa89266 100644 --- a/mirrord/agent/src/file.rs +++ b/mirrord/agent/src/file.rs @@ -14,7 +14,7 @@ use libc::DT_DIR; use mirrord_protocol::{file::*, FileRequest, FileResponse, RemoteResult, ResponseError}; use tracing::{error, trace, Level}; -use crate::{error::Result, metrics::OPEN_FD_COUNT}; +use crate::{error::AgentResult, metrics::OPEN_FD_COUNT}; #[derive(Debug)] pub enum RemoteFile { @@ -139,7 +139,7 @@ impl FileManager { pub(crate) async fn handle_message( &mut self, request: FileRequest, - ) -> Result> { + ) -> AgentResult> { Ok(match request { FileRequest::Open(OpenFileRequest { path, open_options }) => { // TODO: maybe not agent error on this? @@ -857,7 +857,7 @@ impl FileManager { // buffer (and there was no error converting to a // `DirEntryInternal`. while let Some(entry) = entry_results - .next_if(|entry_res: &Result| { + .next_if(|entry_res: &AgentResult| { entry_res.as_ref().is_ok_and(|entry| { entry.get_d_reclen64() as u64 + result_size <= buffer_size }) diff --git a/mirrord/agent/src/main.rs b/mirrord/agent/src/main.rs index 777bdb8feb1..b8c504041ec 100644 --- a/mirrord/agent/src/main.rs +++ b/mirrord/agent/src/main.rs @@ -40,11 +40,12 @@ mod vpn; #[cfg(target_os = "linux")] mod watched_task; +#[cfg(target_os = "linux")] mod metrics; #[cfg(target_os = "linux")] #[tokio::main(flavor = "current_thread")] -async fn main() -> crate::error::Result<()> { +async fn main() -> crate::error::AgentResult<()> { crate::entrypoint::main().await } diff --git a/mirrord/agent/src/metrics.rs b/mirrord/agent/src/metrics.rs index 66b1610cee5..4017f440fe7 100644 --- a/mirrord/agent/src/metrics.rs +++ b/mirrord/agent/src/metrics.rs @@ -1,4 +1,4 @@ -use std::sync::LazyLock; +use std::{net::SocketAddr, sync::LazyLock}; use axum::{response::IntoResponse, routing::get, Router}; use prometheus::{register_int_gauge, IntGauge}; @@ -104,10 +104,10 @@ async fn get_metrics() -> Result { } #[tracing::instrument(level = Level::TRACE, skip_all, ret ,err)] -pub(crate) async fn start_metrics() -> Result<(), axum::BoxError> { +pub(crate) async fn start_metrics(address: SocketAddr) -> Result<(), axum::BoxError> { let app = Router::new().route("/metrics", get(get_metrics)); - let listener = TcpListener::bind("0.0.0.0:9000") + let listener = TcpListener::bind(address) .await .map_err(AgentError::from) .inspect_err(|fail| tracing::error!(?fail, "Actor listener!"))?; diff --git a/mirrord/agent/src/outgoing.rs b/mirrord/agent/src/outgoing.rs index eb92ec68916..1e41c9ce942 100644 --- a/mirrord/agent/src/outgoing.rs +++ b/mirrord/agent/src/outgoing.rs @@ -18,7 +18,7 @@ use tokio_util::io::ReaderStream; use tracing::Level; use crate::{ - error::Result, + error::AgentResult, metrics::TCP_OUTGOING_CONNECTION, util::run_thread_in_namespace, watched_task::{TaskStatus, WatchedTask}, @@ -82,7 +82,7 @@ impl TcpOutgoingApi { /// Sends the [`LayerTcpOutgoing`] message to the background task. #[tracing::instrument(level = Level::TRACE, skip(self), err)] - pub(crate) async fn send_to_task(&mut self, message: LayerTcpOutgoing) -> Result<()> { + pub(crate) async fn send_to_task(&mut self, message: LayerTcpOutgoing) -> AgentResult<()> { if self.layer_tx.send(message).await.is_ok() { Ok(()) } else { @@ -92,7 +92,7 @@ impl TcpOutgoingApi { /// Receives a [`DaemonTcpOutgoing`] message from the background task. #[tracing::instrument(level = Level::TRACE, skip(self), err)] - pub(crate) async fn recv_from_task(&mut self) -> Result { + pub(crate) async fn recv_from_task(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => Ok(msg), None => Err(self.task_status.unwrap_err().await), @@ -153,7 +153,7 @@ impl TcpOutgoingTask { /// Runs this task as long as the channels connecting it with [`TcpOutgoingApi`] are open. /// This routine never fails and returns [`Result`] only due to [`WatchedTask`] constraints. #[tracing::instrument(level = Level::TRACE, skip(self))] - async fn run(mut self) -> Result<()> { + async fn run(mut self) -> AgentResult<()> { loop { let channel_closed = select! { biased; @@ -191,7 +191,7 @@ impl TcpOutgoingTask { &mut self, connection_id: ConnectionId, read: io::Result>, - ) -> Result<(), SendError> { + ) -> AgentResult<(), SendError> { match read { // New bytes came in from a peer connection. // We pass them to the layer. @@ -266,7 +266,7 @@ impl TcpOutgoingTask { async fn handle_layer_msg( &mut self, message: LayerTcpOutgoing, - ) -> Result<(), SendError> { + ) -> AgentResult<(), SendError> { match message { // We make connection to the requested address, split the stream into halves with // `io::split`, and put them into respective maps. diff --git a/mirrord/agent/src/outgoing/udp.rs b/mirrord/agent/src/outgoing/udp.rs index 0a30fcbd5fa..4ab96dd1264 100644 --- a/mirrord/agent/src/outgoing/udp.rs +++ b/mirrord/agent/src/outgoing/udp.rs @@ -23,7 +23,7 @@ use tokio_util::{codec::BytesCodec, udp::UdpFramed}; use tracing::{debug, trace, warn}; use crate::{ - error::Result, + error::AgentResult, metrics::UDP_OUTGOING_CONNECTION, util::run_thread_in_namespace, watched_task::{TaskStatus, WatchedTask}, @@ -57,7 +57,7 @@ pub(crate) struct UdpOutgoingApi { /// 3. User is trying to use `sendto` and `recvfrom`, we use the same hack as in DNS to fake a /// connection. #[tracing::instrument(level = "trace", ret)] -async fn connect(remote_address: SocketAddr) -> Result { +async fn connect(remote_address: SocketAddr) -> AgentResult { let mirror_address = match remote_address { std::net::SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), std::net::SocketAddr::V6(_) => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), @@ -102,7 +102,7 @@ impl UdpOutgoingApi { async fn interceptor_task( mut layer_rx: Receiver, daemon_tx: Sender, - ) -> Result<()> { + ) -> AgentResult<()> { let mut connection_ids = 0..=ConnectionId::MAX; // TODO: Right now we're manually keeping these 2 maps in sync (aviram suggested using @@ -245,7 +245,7 @@ impl UdpOutgoingApi { } /// Sends a `UdpOutgoingRequest` to the `interceptor_task`. - pub(crate) async fn layer_message(&mut self, message: LayerUdpOutgoing) -> Result<()> { + pub(crate) async fn layer_message(&mut self, message: LayerUdpOutgoing) -> AgentResult<()> { trace!( "UdpOutgoingApi::layer_message -> layer_message {:#?}", message @@ -259,7 +259,7 @@ impl UdpOutgoingApi { } /// Receives a `UdpOutgoingResponse` from the `interceptor_task`. - pub(crate) async fn daemon_message(&mut self) -> Result { + pub(crate) async fn daemon_message(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => Ok(msg), None => Err(self.task_status.unwrap_err().await), diff --git a/mirrord/agent/src/steal/api.rs b/mirrord/agent/src/steal/api.rs index a6ec1d8d1f7..7edaa0900b5 100644 --- a/mirrord/agent/src/steal/api.rs +++ b/mirrord/agent/src/steal/api.rs @@ -14,7 +14,7 @@ use tokio_stream::wrappers::ReceiverStream; use super::*; use crate::{ - error::{AgentError, Result}, + error::{AgentError, AgentResult}, util::ClientId, watched_task::TaskStatus, }; @@ -53,7 +53,7 @@ impl TcpStealerApi { task_status: TaskStatus, channel_size: usize, protocol_version: semver::Version, - ) -> Result { + ) -> AgentResult { let (daemon_tx, daemon_rx) = mpsc::channel(channel_size); command_tx @@ -73,7 +73,7 @@ impl TcpStealerApi { } /// Send `command` to stealer, with the client id of the client that is using this API instance. - async fn send_command(&mut self, command: Command) -> Result<()> { + async fn send_command(&mut self, command: Command) -> AgentResult<()> { let command = StealerCommand { client_id: self.client_id, command, @@ -91,7 +91,7 @@ impl TcpStealerApi { /// /// Called in the `ClientConnectionHandler`. #[tracing::instrument(level = "trace", skip(self))] - pub(crate) async fn recv(&mut self) -> Result { + pub(crate) async fn recv(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => { if let DaemonTcp::Close(close) = &msg { @@ -108,7 +108,7 @@ impl TcpStealerApi { /// agent, to an internal stealer command [`Command::PortSubscribe`]. /// /// The actual handling of this message is done in [`TcpConnectionStealer`]. - pub(crate) async fn port_subscribe(&mut self, port_steal: StealType) -> Result<(), AgentError> { + pub(crate) async fn port_subscribe(&mut self, port_steal: StealType) -> AgentResult<(), AgentError> { self.send_command(Command::PortSubscribe(port_steal)).await } @@ -116,7 +116,7 @@ impl TcpStealerApi { /// agent, to an internal stealer command [`Command::PortUnsubscribe`]. /// /// The actual handling of this message is done in [`TcpConnectionStealer`]. - pub(crate) async fn port_unsubscribe(&mut self, port: Port) -> Result<(), AgentError> { + pub(crate) async fn port_unsubscribe(&mut self, port: Port) -> AgentResult<(), AgentError> { self.send_command(Command::PortUnsubscribe(port)).await } @@ -127,7 +127,7 @@ impl TcpStealerApi { pub(crate) async fn connection_unsubscribe( &mut self, connection_id: ConnectionId, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { self.send_command(Command::ConnectionUnsubscribe(connection_id)) .await } @@ -136,7 +136,7 @@ impl TcpStealerApi { /// agent, to an internal stealer command [`Command::ResponseData`]. /// /// The actual handling of this message is done in [`TcpConnectionStealer`]. - pub(crate) async fn client_data(&mut self, tcp_data: TcpData) -> Result<(), AgentError> { + pub(crate) async fn client_data(&mut self, tcp_data: TcpData) -> AgentResult<(), AgentError> { self.send_command(Command::ResponseData(tcp_data)).await } @@ -147,19 +147,19 @@ impl TcpStealerApi { pub(crate) async fn http_response( &mut self, response: HttpResponseFallback, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { self.send_command(Command::HttpResponse(response)).await } pub(crate) async fn switch_protocol_version( &mut self, version: semver::Version, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { self.send_command(Command::SwitchProtocolVersion(version)) .await } - pub(crate) async fn handle_client_message(&mut self, message: LayerTcpSteal) -> Result<()> { + pub(crate) async fn handle_client_message(&mut self, message: LayerTcpSteal) -> AgentResult<()> { match message { LayerTcpSteal::PortSubscribe(port_steal) => self.port_subscribe(port_steal).await, LayerTcpSteal::ConnectionUnsubscribe(connection_id) => { diff --git a/mirrord/agent/src/steal/connection.rs b/mirrord/agent/src/steal/connection.rs index 3657c55ebd8..ab30fc61499 100644 --- a/mirrord/agent/src/steal/connection.rs +++ b/mirrord/agent/src/steal/connection.rs @@ -32,7 +32,7 @@ use tokio_util::sync::CancellationToken; use tracing::{warn, Level}; use crate::{ - error::{AgentError, Result}, + error::{AgentError, AgentResult}, metrics::{ STEAL_CONNECTION_SUBSCRIPTION, STEAL_FILTERED_PORT_SUBSCRIPTION, STEAL_UNFILTERED_PORT_SUBSCRIPTION, @@ -59,7 +59,7 @@ struct MatchedHttpRequest { } impl MatchedHttpRequest { - async fn into_serializable(self) -> Result, hyper::Error> { + async fn into_serializable(self) -> AgentResult, hyper::Error> { let ( Parts { method, @@ -89,7 +89,7 @@ impl MatchedHttpRequest { }) } - async fn into_serializable_fallback(self) -> Result>, hyper::Error> { + async fn into_serializable_fallback(self) -> AgentResult>, hyper::Error> { let ( Parts { method, @@ -185,7 +185,7 @@ impl Client { let frames = frames .into_iter() .map(InternalHttpBodyFrame::try_from) - .filter_map(Result::ok) + .filter_map(AgentResult::ok) .collect(); let message = DaemonTcp::HttpRequestChunked(ChunkedRequest::Start(HttpRequest { @@ -214,7 +214,7 @@ impl Client { let frames = frames .into_iter() .map(InternalHttpBodyFrame::try_from) - .filter_map(Result::ok) + .filter_map(AgentResult::ok) .collect(); let message = DaemonTcp::HttpRequestChunked(ChunkedRequest::Body( ChunkedHttpBody { @@ -303,7 +303,7 @@ impl TcpConnectionStealer { /// Initializes a new [`TcpConnectionStealer`], but doesn't start the actual work. /// You need to call [`TcpConnectionStealer::start`] to do so. #[tracing::instrument(level = Level::TRACE, err)] - pub(crate) async fn new(command_rx: Receiver) -> Result { + pub(crate) async fn new(command_rx: Receiver) -> AgentResult { let config = envy::prefixed("MIRRORD_AGENT_") .from_env::() .unwrap_or_default(); @@ -339,7 +339,7 @@ impl TcpConnectionStealer { pub(crate) async fn start( mut self, cancellation_token: CancellationToken, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { loop { tokio::select! { command = self.command_rx.recv() => { @@ -379,7 +379,11 @@ impl TcpConnectionStealer { /// Handles a new remote connection that was stolen by [`Self::port_subscriptions`]. #[tracing::instrument(level = "trace", skip(self))] - async fn incoming_connection(&mut self, stream: TcpStream, peer: SocketAddr) -> Result<()> { + async fn incoming_connection( + &mut self, + stream: TcpStream, + peer: SocketAddr, + ) -> AgentResult<()> { let mut real_address = orig_dst::orig_dst_addr(&stream)?; // If we use the original IP we would go through prerouting and hit a loop. // localhost should always work. @@ -413,7 +417,7 @@ impl TcpConnectionStealer { async fn handle_connection_update( &mut self, update: ConnectionMessageOut, - ) -> Result<(), AgentError> { + ) -> AgentResult<(), AgentError> { match update { ConnectionMessageOut::Closed { connection_id, @@ -551,7 +555,11 @@ impl TcpConnectionStealer { /// /// - Returns: `true` if this is an HTTP filtered subscription. #[tracing::instrument(level = Level::TRACE, skip(self), err)] - async fn port_subscribe(&mut self, client_id: ClientId, port_steal: StealType) -> Result { + async fn port_subscribe( + &mut self, + client_id: ClientId, + port_steal: StealType, + ) -> AgentResult { let spec = match port_steal { StealType::All(port) => Ok((port, None)), StealType::FilteredHttp(port, filter) => Regex::new(&format!("(?i){filter}")) @@ -582,7 +590,7 @@ impl TcpConnectionStealer { /// their subscriptions from [`Self::port_subscriptions`] and all their open /// connections. #[tracing::instrument(level = "trace", skip(self))] - async fn close_client(&mut self, client_id: ClientId) -> Result<(), AgentError> { + async fn close_client(&mut self, client_id: ClientId) -> AgentResult<(), AgentError> { let removed_subscriptions = self.port_subscriptions.remove_all(client_id).await?; for filtered in removed_subscriptions { @@ -647,7 +655,7 @@ impl TcpConnectionStealer { /// Handles [`Command`]s that were received by [`TcpConnectionStealer::command_rx`]. #[tracing::instrument(level = Level::TRACE, skip(self), err)] - async fn handle_command(&mut self, command: StealerCommand) -> Result<(), AgentError> { + async fn handle_command(&mut self, command: StealerCommand) -> AgentResult<(), AgentError> { let StealerCommand { client_id, command } = command; match command { diff --git a/mirrord/agent/src/steal/ip_tables.rs b/mirrord/agent/src/steal/ip_tables.rs index c25ff1eb36c..6b131262cbf 100644 --- a/mirrord/agent/src/steal/ip_tables.rs +++ b/mirrord/agent/src/steal/ip_tables.rs @@ -9,7 +9,7 @@ use rand::distributions::{Alphanumeric, DistString}; use tracing::warn; use crate::{ - error::{AgentError, Result}, + error::{AgentError, AgentResult}, steal::ip_tables::{ flush_connections::FlushConnections, mesh::{istio::AmbientRedirect, MeshRedirect, MeshVendorExt}, @@ -24,26 +24,26 @@ mod iptables { pub struct IPTables; impl IPTables { - pub fn list(&self, _: &str, _: &str) -> Result, String> { + pub fn list(&self, _: &str, _: &str) -> AgentResult, String> { todo!() } - pub fn insert(&self, _: &str, _: &str, _: &str, _: i32) -> Result<(), String> { + pub fn insert(&self, _: &str, _: &str, _: &str, _: i32) -> AgentResult<(), String> { todo!() } - pub fn append(&self, _: &str, _: &str, _: &str) -> Result<(), String> { + pub fn append(&self, _: &str, _: &str, _: &str) -> AgentResult<(), String> { todo!() } - pub fn delete(&self, _: &str, _: &str, _: &str) -> Result<(), String> { + pub fn delete(&self, _: &str, _: &str, _: &str) -> AgentResult<(), String> { todo!() } - pub fn new_chain(&self, _: &str, _: &str) -> Result<(), String> { + pub fn new_chain(&self, _: &str, _: &str) -> AgentResult<(), String> { todo!() } - pub fn delete_chain(&self, _: &str, _: &str) -> Result<(), String> { + pub fn delete_chain(&self, _: &str, _: &str) -> AgentResult<(), String> { todo!() } - pub fn flush_chain(&self, _: &str, _: &str) -> Result<(), String> { + pub fn flush_chain(&self, _: &str, _: &str) -> AgentResult<(), String> { todo!() } } @@ -114,13 +114,13 @@ pub(crate) trait IPTables { where Self: Sized; - fn create_chain(&self, name: &str) -> Result<()>; - fn remove_chain(&self, name: &str) -> Result<()>; + fn create_chain(&self, name: &str) -> AgentResult<()>; + fn remove_chain(&self, name: &str) -> AgentResult<()>; - fn add_rule(&self, chain: &str, rule: &str) -> Result<()>; - fn insert_rule(&self, chain: &str, rule: &str, index: i32) -> Result<()>; - fn list_rules(&self, chain: &str) -> Result>; - fn remove_rule(&self, chain: &str, rule: &str) -> Result<()>; + fn add_rule(&self, chain: &str, rule: &str) -> AgentResult<()>; + fn insert_rule(&self, chain: &str, rule: &str, index: i32) -> AgentResult<()>; + fn list_rules(&self, chain: &str) -> AgentResult>; + fn remove_rule(&self, chain: &str, rule: &str) -> AgentResult<()>; } #[derive(Clone)] @@ -171,7 +171,7 @@ impl IPTables for IPTablesWrapper { } #[tracing::instrument(level = "trace")] - fn create_chain(&self, name: &str) -> Result<()> { + fn create_chain(&self, name: &str) -> AgentResult<()> { self.tables .new_chain(self.table_name, name) .map_err(|e| AgentError::IPTablesError(e.to_string()))?; @@ -183,7 +183,7 @@ impl IPTables for IPTablesWrapper { } #[tracing::instrument(level = "trace")] - fn remove_chain(&self, name: &str) -> Result<()> { + fn remove_chain(&self, name: &str) -> AgentResult<()> { self.tables .flush_chain(self.table_name, name) .map_err(|e| AgentError::IPTablesError(e.to_string()))?; @@ -195,28 +195,28 @@ impl IPTables for IPTablesWrapper { } #[tracing::instrument(level = "trace", ret)] - fn add_rule(&self, chain: &str, rule: &str) -> Result<()> { + fn add_rule(&self, chain: &str, rule: &str) -> AgentResult<()> { self.tables .append(self.table_name, chain, rule) .map_err(|e| AgentError::IPTablesError(e.to_string())) } #[tracing::instrument(level = "trace", ret)] - fn insert_rule(&self, chain: &str, rule: &str, index: i32) -> Result<()> { + fn insert_rule(&self, chain: &str, rule: &str, index: i32) -> AgentResult<()> { self.tables .insert(self.table_name, chain, rule, index) .map_err(|e| AgentError::IPTablesError(e.to_string())) } #[tracing::instrument(level = "trace")] - fn list_rules(&self, chain: &str) -> Result> { + fn list_rules(&self, chain: &str) -> AgentResult> { self.tables .list(self.table_name, chain) .map_err(|e| AgentError::IPTablesError(e.to_string())) } #[tracing::instrument(level = "trace")] - fn remove_rule(&self, chain: &str, rule: &str) -> Result<()> { + fn remove_rule(&self, chain: &str, rule: &str) -> AgentResult<()> { self.tables .delete(self.table_name, chain, rule) .map_err(|e| AgentError::IPTablesError(e.to_string())) @@ -250,7 +250,7 @@ where ipt: IPT, flush_connections: bool, pod_ips: Option<&str>, - ) -> Result { + ) -> AgentResult { let ipt = Arc::new(ipt); let mut redirect = if let Some(vendor) = MeshVendor::detect(ipt.as_ref())? { @@ -281,7 +281,7 @@ where Ok(Self { redirect }) } - pub(crate) async fn load(ipt: IPT, flush_connections: bool) -> Result { + pub(crate) async fn load(ipt: IPT, flush_connections: bool) -> AgentResult { let ipt = Arc::new(ipt); let mut redirect = if let Some(vendor) = MeshVendor::detect(ipt.as_ref())? { @@ -315,7 +315,7 @@ where &self, redirected_port: Port, target_port: Port, - ) -> Result<()> { + ) -> AgentResult<()> { self.redirect .add_redirect(redirected_port, target_port) .await @@ -330,13 +330,13 @@ where &self, redirected_port: Port, target_port: Port, - ) -> Result<()> { + ) -> AgentResult<()> { self.redirect .remove_redirect(redirected_port, target_port) .await } - pub(crate) async fn cleanup(&self) -> Result<()> { + pub(crate) async fn cleanup(&self) -> AgentResult<()> { self.redirect.unmount_entrypoint().await } } diff --git a/mirrord/agent/src/steal/ip_tables/chain.rs b/mirrord/agent/src/steal/ip_tables/chain.rs index c5bc6d65404..c1c34715c85 100644 --- a/mirrord/agent/src/steal/ip_tables/chain.rs +++ b/mirrord/agent/src/steal/ip_tables/chain.rs @@ -4,7 +4,7 @@ use std::sync::{ }; use crate::{ - error::{AgentError, Result}, + error::{AgentError, AgentResult}, steal::ip_tables::IPTables, }; @@ -19,7 +19,7 @@ impl IPTableChain where IPT: IPTables, { - pub fn create(inner: Arc, chain_name: String) -> Result { + pub fn create(inner: Arc, chain_name: String) -> AgentResult { inner.create_chain(&chain_name)?; // Start with 1 because the chain will allways have atleast `-A ` as a rule @@ -32,7 +32,7 @@ where }) } - pub fn load(inner: Arc, chain_name: String) -> Result { + pub fn load(inner: Arc, chain_name: String) -> AgentResult { let existing_rules = inner.list_rules(&chain_name)?.len(); if existing_rules == 0 { @@ -59,7 +59,7 @@ where &self.inner } - pub fn add_rule(&self, rule: &str) -> Result { + pub fn add_rule(&self, rule: &str) -> AgentResult { self.inner .insert_rule( &self.chain_name, @@ -72,7 +72,7 @@ where }) } - pub fn remove_rule(&self, rule: &str) -> Result<()> { + pub fn remove_rule(&self, rule: &str) -> AgentResult<()> { self.inner.remove_rule(&self.chain_name, rule)?; self.chain_size.fetch_sub(1, Ordering::Relaxed); diff --git a/mirrord/agent/src/steal/ip_tables/flush_connections.rs b/mirrord/agent/src/steal/ip_tables/flush_connections.rs index 6675a40651f..c0f19c20b8d 100644 --- a/mirrord/agent/src/steal/ip_tables/flush_connections.rs +++ b/mirrord/agent/src/steal/ip_tables/flush_connections.rs @@ -13,7 +13,7 @@ use tokio::process::Command; use tracing::warn; use crate::{ - error::Result, + error::AgentResult, steal::ip_tables::{chain::IPTableChain, redirect::Redirect, IPTables, IPTABLE_INPUT}, }; @@ -33,7 +33,7 @@ where const ENTRYPOINT: &'static str = "INPUT"; #[tracing::instrument(level = "trace", skip(ipt, inner))] - pub fn create(ipt: Arc, inner: Box) -> Result { + pub fn create(ipt: Arc, inner: Box) -> AgentResult { let managed = IPTableChain::create(ipt.with_table("filter").into(), IPTABLE_INPUT.to_string())?; @@ -48,7 +48,7 @@ where } #[tracing::instrument(level = "trace", skip(ipt, inner))] - pub fn load(ipt: Arc, inner: Box) -> Result { + pub fn load(ipt: Arc, inner: Box) -> AgentResult { let managed = IPTableChain::load(ipt.with_table("filter").into(), IPTABLE_INPUT.to_string())?; @@ -63,7 +63,7 @@ where T: Redirect + Send + Sync, { #[tracing::instrument(level = "trace", skip(self), ret)] - async fn mount_entrypoint(&self) -> Result<()> { + async fn mount_entrypoint(&self) -> AgentResult<()> { self.inner.mount_entrypoint().await?; self.managed.inner().add_rule( @@ -75,7 +75,7 @@ where } #[tracing::instrument(level = "trace", skip(self), ret)] - async fn unmount_entrypoint(&self) -> Result<()> { + async fn unmount_entrypoint(&self) -> AgentResult<()> { self.inner.unmount_entrypoint().await?; self.managed.inner().remove_rule( @@ -87,7 +87,7 @@ where } #[tracing::instrument(level = "trace", skip(self), ret)] - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { self.inner .add_redirect(redirected_port, target_port) .await?; @@ -115,7 +115,7 @@ where } #[tracing::instrument(level = "trace", skip(self), ret)] - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { self.inner .remove_redirect(redirected_port, target_port) .await?; diff --git a/mirrord/agent/src/steal/ip_tables/mesh.rs b/mirrord/agent/src/steal/ip_tables/mesh.rs index 88fdff5d0b1..1a3e5acbe62 100644 --- a/mirrord/agent/src/steal/ip_tables/mesh.rs +++ b/mirrord/agent/src/steal/ip_tables/mesh.rs @@ -5,7 +5,7 @@ use fancy_regex::Regex; use mirrord_protocol::{MeshVendor, Port}; use crate::{ - error::Result, + error::AgentResult, steal::ip_tables::{ output::OutputRedirect, prerouting::PreroutingRedirect, redirect::Redirect, IPTables, IPTABLE_MESH, @@ -29,7 +29,7 @@ impl MeshRedirect where IPT: IPTables, { - pub fn create(ipt: Arc, vendor: MeshVendor, pod_ips: Option<&str>) -> Result { + pub fn create(ipt: Arc, vendor: MeshVendor, pod_ips: Option<&str>) -> AgentResult { let prerouting = PreroutingRedirect::create(ipt.clone())?; for port in Self::get_skip_ports(&ipt, &vendor)? { @@ -45,7 +45,7 @@ where }) } - pub fn load(ipt: Arc, vendor: MeshVendor) -> Result { + pub fn load(ipt: Arc, vendor: MeshVendor) -> AgentResult { let prerouting = PreroutingRedirect::load(ipt.clone())?; let output = OutputRedirect::load(ipt, IPTABLE_MESH.to_string())?; @@ -56,7 +56,7 @@ where }) } - fn get_skip_ports(ipt: &IPT, vendor: &MeshVendor) -> Result> { + fn get_skip_ports(ipt: &IPT, vendor: &MeshVendor) -> AgentResult> { let chain_name = vendor.input_chain(); let lookup_regex = if let Some(regex) = vendor.skip_ports_regex() { regex @@ -86,21 +86,21 @@ impl Redirect for MeshRedirect where IPT: IPTables + Send + Sync, { - async fn mount_entrypoint(&self) -> Result<()> { + async fn mount_entrypoint(&self) -> AgentResult<()> { self.prerouting.mount_entrypoint().await?; self.output.mount_entrypoint().await?; Ok(()) } - async fn unmount_entrypoint(&self) -> Result<()> { + async fn unmount_entrypoint(&self) -> AgentResult<()> { self.prerouting.unmount_entrypoint().await?; self.output.unmount_entrypoint().await?; Ok(()) } - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { if self.vendor != MeshVendor::IstioCni { self.prerouting .add_redirect(redirected_port, target_port) @@ -113,7 +113,7 @@ where Ok(()) } - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { if self.vendor != MeshVendor::IstioCni { self.prerouting .remove_redirect(redirected_port, target_port) @@ -129,13 +129,13 @@ where /// Extends the [`MeshVendor`] type with methods that are only relevant for the agent. pub(super) trait MeshVendorExt: Sized { - fn detect(ipt: &IPT) -> Result>; + fn detect(ipt: &IPT) -> AgentResult>; fn input_chain(&self) -> &str; fn skip_ports_regex(&self) -> Option<&Regex>; } impl MeshVendorExt for MeshVendor { - fn detect(ipt: &IPT) -> Result> { + fn detect(ipt: &IPT) -> AgentResult> { if let Ok(val) = std::env::var("MIRRORD_AGENT_ISTIO_CNI") && val.to_lowercase() == "true" { diff --git a/mirrord/agent/src/steal/ip_tables/mesh/istio.rs b/mirrord/agent/src/steal/ip_tables/mesh/istio.rs index cd3d4b06fa9..01e513a6bf9 100644 --- a/mirrord/agent/src/steal/ip_tables/mesh/istio.rs +++ b/mirrord/agent/src/steal/ip_tables/mesh/istio.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use mirrord_protocol::Port; use crate::{ - error::Result, + error::AgentResult, steal::ip_tables::{ output::OutputRedirect, prerouting::PreroutingRedirect, redirect::Redirect, IPTables, IPTABLE_IPV4_ROUTE_LOCALNET_ORIGINAL, IPTABLE_MESH, @@ -20,14 +20,14 @@ impl AmbientRedirect where IPT: IPTables, { - pub fn create(ipt: Arc, pod_ips: Option<&str>) -> Result { + pub fn create(ipt: Arc, pod_ips: Option<&str>) -> AgentResult { let prerouting = PreroutingRedirect::create(ipt.clone())?; let output = OutputRedirect::create(ipt, IPTABLE_MESH.to_string(), pod_ips)?; Ok(AmbientRedirect { prerouting, output }) } - pub fn load(ipt: Arc) -> Result { + pub fn load(ipt: Arc) -> AgentResult { let prerouting = PreroutingRedirect::load(ipt.clone())?; let output = OutputRedirect::load(ipt, IPTABLE_MESH.to_string())?; @@ -40,7 +40,7 @@ impl Redirect for AmbientRedirect where IPT: IPTables + Send + Sync, { - async fn mount_entrypoint(&self) -> Result<()> { + async fn mount_entrypoint(&self) -> AgentResult<()> { tokio::fs::write("/proc/sys/net/ipv4/conf/all/route_localnet", "1".as_bytes()).await?; self.prerouting.mount_entrypoint().await?; @@ -49,7 +49,7 @@ where Ok(()) } - async fn unmount_entrypoint(&self) -> Result<()> { + async fn unmount_entrypoint(&self) -> AgentResult<()> { self.prerouting.unmount_entrypoint().await?; self.output.unmount_entrypoint().await?; @@ -62,7 +62,7 @@ where Ok(()) } - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { self.prerouting .add_redirect(redirected_port, target_port) .await?; @@ -73,7 +73,7 @@ where Ok(()) } - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { self.prerouting .remove_redirect(redirected_port, target_port) .await?; diff --git a/mirrord/agent/src/steal/ip_tables/output.rs b/mirrord/agent/src/steal/ip_tables/output.rs index 944bc26f95b..b28cc93b24b 100644 --- a/mirrord/agent/src/steal/ip_tables/output.rs +++ b/mirrord/agent/src/steal/ip_tables/output.rs @@ -6,7 +6,7 @@ use nix::unistd::getgid; use tracing::warn; use crate::{ - error::Result, + error::AgentResult, steal::ip_tables::{chain::IPTableChain, IPTables, Redirect}, }; @@ -20,7 +20,7 @@ where { const ENTRYPOINT: &'static str = "OUTPUT"; - pub fn create(ipt: Arc, chain_name: String, pod_ips: Option<&str>) -> Result { + pub fn create(ipt: Arc, chain_name: String, pod_ips: Option<&str>) -> AgentResult { let managed = IPTableChain::create(ipt, chain_name)?; let exclude_source_ips = pod_ips @@ -39,7 +39,7 @@ where Ok(OutputRedirect { managed }) } - pub fn load(ipt: Arc, chain_name: String) -> Result { + pub fn load(ipt: Arc, chain_name: String) -> AgentResult { let managed = IPTableChain::load(ipt, chain_name)?; Ok(OutputRedirect { managed }) @@ -53,7 +53,7 @@ impl Redirect for OutputRedirect where IPT: IPTables + Send + Sync, { - async fn mount_entrypoint(&self) -> Result<()> { + async fn mount_entrypoint(&self) -> AgentResult<()> { if USE_INSERT { self.managed.inner().insert_rule( Self::ENTRYPOINT, @@ -70,7 +70,7 @@ where Ok(()) } - async fn unmount_entrypoint(&self) -> Result<()> { + async fn unmount_entrypoint(&self) -> AgentResult<()> { self.managed.inner().remove_rule( Self::ENTRYPOINT, &format!("-j {}", self.managed.chain_name()), @@ -79,7 +79,7 @@ where Ok(()) } - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { let redirect_rule = format!( "-o lo -m tcp -p tcp --dport {redirected_port} -j REDIRECT --to-ports {target_port}" ); @@ -89,7 +89,7 @@ where Ok(()) } - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { let redirect_rule = format!( "-o lo -m tcp -p tcp --dport {redirected_port} -j REDIRECT --to-ports {target_port}" ); diff --git a/mirrord/agent/src/steal/ip_tables/prerouting.rs b/mirrord/agent/src/steal/ip_tables/prerouting.rs index 486b0ca1b51..29d5de06103 100644 --- a/mirrord/agent/src/steal/ip_tables/prerouting.rs +++ b/mirrord/agent/src/steal/ip_tables/prerouting.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use mirrord_protocol::Port; use crate::{ - error::Result, + error::AgentResult, steal::ip_tables::{chain::IPTableChain, IPTables, Redirect, IPTABLE_PREROUTING}, }; @@ -18,13 +18,13 @@ where { const ENTRYPOINT: &'static str = "PREROUTING"; - pub fn create(ipt: Arc) -> Result { + pub fn create(ipt: Arc) -> AgentResult { let managed = IPTableChain::create(ipt, IPTABLE_PREROUTING.to_string())?; Ok(PreroutingRedirect { managed }) } - pub fn load(ipt: Arc) -> Result { + pub fn load(ipt: Arc) -> AgentResult { let managed = IPTableChain::load(ipt, IPTABLE_PREROUTING.to_string())?; Ok(PreroutingRedirect { managed }) @@ -36,7 +36,7 @@ impl Redirect for PreroutingRedirect where IPT: IPTables + Send + Sync, { - async fn mount_entrypoint(&self) -> Result<()> { + async fn mount_entrypoint(&self) -> AgentResult<()> { self.managed.inner().add_rule( Self::ENTRYPOINT, &format!("-j {}", self.managed.chain_name()), @@ -45,7 +45,7 @@ where Ok(()) } - async fn unmount_entrypoint(&self) -> Result<()> { + async fn unmount_entrypoint(&self) -> AgentResult<()> { self.managed.inner().remove_rule( Self::ENTRYPOINT, &format!("-j {}", self.managed.chain_name()), @@ -54,7 +54,7 @@ where Ok(()) } - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { let redirect_rule = format!("-m tcp -p tcp --dport {redirected_port} -j REDIRECT --to-ports {target_port}"); @@ -63,7 +63,7 @@ where Ok(()) } - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { let redirect_rule = format!("-m tcp -p tcp --dport {redirected_port} -j REDIRECT --to-ports {target_port}"); diff --git a/mirrord/agent/src/steal/ip_tables/redirect.rs b/mirrord/agent/src/steal/ip_tables/redirect.rs index d18aeb1d7ea..fe52d90fc1e 100644 --- a/mirrord/agent/src/steal/ip_tables/redirect.rs +++ b/mirrord/agent/src/steal/ip_tables/redirect.rs @@ -2,17 +2,17 @@ use async_trait::async_trait; use enum_dispatch::enum_dispatch; use mirrord_protocol::Port; -use crate::error::Result; +use crate::error::AgentResult; #[async_trait] #[enum_dispatch] pub(crate) trait Redirect { - async fn mount_entrypoint(&self) -> Result<()>; + async fn mount_entrypoint(&self) -> AgentResult<()>; - async fn unmount_entrypoint(&self) -> Result<()>; + async fn unmount_entrypoint(&self) -> AgentResult<()>; /// Create port redirection - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()>; + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()>; /// Remove port redirection - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()>; + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()>; } diff --git a/mirrord/agent/src/steal/ip_tables/standard.rs b/mirrord/agent/src/steal/ip_tables/standard.rs index 3302b05c02e..47b9bf0c0af 100644 --- a/mirrord/agent/src/steal/ip_tables/standard.rs +++ b/mirrord/agent/src/steal/ip_tables/standard.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use mirrord_protocol::Port; use crate::{ - error::Result, + error::AgentResult, steal::ip_tables::{ output::OutputRedirect, prerouting::PreroutingRedirect, IPTables, Redirect, IPTABLE_STANDARD, @@ -20,14 +20,14 @@ impl StandardRedirect where IPT: IPTables, { - pub fn create(ipt: Arc, pod_ips: Option<&str>) -> Result { + pub fn create(ipt: Arc, pod_ips: Option<&str>) -> AgentResult { let prerouting = PreroutingRedirect::create(ipt.clone())?; let output = OutputRedirect::create(ipt, IPTABLE_STANDARD.to_string(), pod_ips)?; Ok(StandardRedirect { prerouting, output }) } - pub fn load(ipt: Arc) -> Result { + pub fn load(ipt: Arc) -> AgentResult { let prerouting = PreroutingRedirect::load(ipt.clone())?; let output = OutputRedirect::load(ipt, IPTABLE_STANDARD.to_string())?; @@ -42,21 +42,21 @@ impl Redirect for StandardRedirect where IPT: IPTables + Send + Sync, { - async fn mount_entrypoint(&self) -> Result<()> { + async fn mount_entrypoint(&self) -> AgentResult<()> { self.prerouting.mount_entrypoint().await?; self.output.mount_entrypoint().await?; Ok(()) } - async fn unmount_entrypoint(&self) -> Result<()> { + async fn unmount_entrypoint(&self) -> AgentResult<()> { self.prerouting.unmount_entrypoint().await?; self.output.unmount_entrypoint().await?; Ok(()) } - async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn add_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { self.prerouting .add_redirect(redirected_port, target_port) .await?; @@ -67,7 +67,7 @@ where Ok(()) } - async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> Result<()> { + async fn remove_redirect(&self, redirected_port: Port, target_port: Port) -> AgentResult<()> { self.prerouting .remove_redirect(redirected_port, target_port) .await?; diff --git a/mirrord/agent/src/vpn.rs b/mirrord/agent/src/vpn.rs index dd8c3a5133f..d7d30d5ca6f 100644 --- a/mirrord/agent/src/vpn.rs +++ b/mirrord/agent/src/vpn.rs @@ -17,7 +17,7 @@ use tokio::{ }; use crate::{ - error::{AgentError, Result}, + error::{AgentError, AgentResult}, util::run_thread_in_namespace, watched_task::{TaskStatus, WatchedTask}, }; @@ -75,7 +75,7 @@ impl VpnApi { /// Sends the [`ClientVpn`] message to the background task. #[tracing::instrument(level = "trace", skip(self))] - pub(crate) async fn layer_message(&mut self, message: ClientVpn) -> Result<()> { + pub(crate) async fn layer_message(&mut self, message: ClientVpn) -> AgentResult<()> { if self.layer_tx.send(message).await.is_ok() { Ok(()) } else { @@ -84,7 +84,7 @@ impl VpnApi { } /// Receives a [`ServerVpn`] message from the background task. - pub(crate) async fn daemon_message(&mut self) -> Result { + pub(crate) async fn daemon_message(&mut self) -> AgentResult { match self.daemon_rx.recv().await { Some(msg) => Ok(msg), None => Err(self.task_status.unwrap_err().await), @@ -121,7 +121,7 @@ impl AsyncRawSocket { } } -async fn create_raw_socket() -> Result { +async fn create_raw_socket() -> AgentResult { let index = nix::net::if_::if_nametoindex("eth0") .map_err(|err| AgentError::VpnError(err.to_string()))?; @@ -139,7 +139,7 @@ async fn create_raw_socket() -> Result { } #[tracing::instrument(level = "debug", ret)] -async fn resolve_interface() -> Result<(IpAddr, IpAddr, IpAddr)> { +async fn resolve_interface() -> AgentResult<(IpAddr, IpAddr, IpAddr)> { // Connect to a remote address so we can later get the default network interface. let temporary_socket = UdpSocket::bind("0.0.0.0:0").await?; temporary_socket.connect("8.8.8.8:53").await?; @@ -209,7 +209,7 @@ impl fmt::Debug for VpnTask { } } -fn interface_index_to_sock_addr(index: i32) -> Result { +fn interface_index_to_sock_addr(index: i32) -> AgentResult { let mut addr_storage: libc::sockaddr_storage = unsafe { std::mem::zeroed() }; let len = std::mem::size_of::() as libc::socklen_t; let macs = procfs::net::arp().map_err(|err| AgentError::VpnError(err.to_string()))?; @@ -245,7 +245,7 @@ impl VpnTask { } #[allow(clippy::indexing_slicing)] - async fn run(mut self) -> Result<()> { + async fn run(mut self) -> AgentResult<()> { // so host won't respond with RST to our packets. // TODO: need to do it for UDP as well to avoid ICMP unreachable. let output = std::process::Command::new("iptables") @@ -318,7 +318,7 @@ impl VpnTask { &mut self, message: ClientVpn, network_configuration: &NetworkConfiguration, - ) -> Result<()> { + ) -> AgentResult<()> { match message { // We make connection to the requested address, split the stream into halves with // `io::split`, and put them into respective maps. diff --git a/mirrord/config/src/agent.rs b/mirrord/config/src/agent.rs index b82c45a7177..79b3bdd27a9 100644 --- a/mirrord/config/src/agent.rs +++ b/mirrord/config/src/agent.rs @@ -322,7 +322,11 @@ pub struct AgentConfig { /// /// ```json /// { - /// "annotations": { "cats.io/inject": "enabled" } + /// "annotations": { + /// "cats.io/inject": "enabled" + /// "prometheus.io/scrape": "true", + /// "prometheus.io/port": "9000" + /// } /// } /// ``` pub annotations: Option>, @@ -350,6 +354,21 @@ pub struct AgentConfig { /// ``` pub service_account: Option, + /// ### agent.metrics {#agent-metrics} + /// + /// Enables prometheus metrics for the agent pod. + /// + /// You might need to add annotations to the agent pod depending on how prometheus is + /// configured to scrape for metrics. + /// + /// ```json + /// { + /// "metrics": "0.0.0.0:9000" + /// } + /// ``` + #[config(env = "MIRRORD_AGENT_METRICS")] + pub metrics: Option, + /// /// Create an agent that returns an error after accepting the first client. For testing /// purposes. Only supported with job agents (not with ephemeral agents). diff --git a/mirrord/kube/src/api/container/job.rs b/mirrord/kube/src/api/container/job.rs index b8d702f13f9..907aefffeeb 100644 --- a/mirrord/kube/src/api/container/job.rs +++ b/mirrord/kube/src/api/container/job.rs @@ -152,8 +152,6 @@ where "disabled".to_string(), ), ("app".to_string(), "mirrord".to_string()), - ("prometheus.io/scrape".to_string(), "true".to_string()), - ("prometheus.io/port".to_string(), "9000".to_string()), ])); let mut annotations = config @@ -165,9 +163,6 @@ where annotations.extend(BTreeMap::from([ ("sidecar.istio.io/inject".to_string(), "false".to_string()), ("linkerd.io/inject".to_string(), "disabled".to_string()), - ("prometheus.io/scrape".to_string(), "true".to_string()), - // ("prometheus.io/path".to_string(), "/metrics".to_string()), - ("prometheus.io/port".to_string(), "9000".to_string()), ])); pod.labels_mut().extend(labels.clone()); diff --git a/mirrord/kube/src/api/container/pod.rs b/mirrord/kube/src/api/container/pod.rs index 9ecb9317cea..f8461e8a002 100644 --- a/mirrord/kube/src/api/container/pod.rs +++ b/mirrord/kube/src/api/container/pod.rs @@ -106,8 +106,6 @@ impl ContainerVariant for PodVariant<'_> { [ ("sidecar.istio.io/inject".to_string(), "false".to_string()), ("linkerd.io/inject".to_string(), "disabled".to_string()), - ("prometheus.io/scrape".to_string(), "true".to_string()), - ("prometheus.io/port".to_string(), "9000".to_string()), ] .into(), ), @@ -118,8 +116,6 @@ impl ContainerVariant for PodVariant<'_> { "disabled".to_string(), ), ("app".to_string(), "mirrord".to_string()), - ("prometheus.io/scrape".to_string(), "true".to_string()), - ("prometheus.io/port".to_string(), "9000".to_string()), ] .into(), ), diff --git a/mirrord/kube/src/api/container/util.rs b/mirrord/kube/src/api/container/util.rs index 77f917378ce..bde5fb647a0 100644 --- a/mirrord/kube/src/api/container/util.rs +++ b/mirrord/kube/src/api/container/util.rs @@ -4,7 +4,7 @@ use futures::{AsyncBufReadExt, TryStreamExt}; use k8s_openapi::api::core::v1::{EnvVar, Pod, Toleration}; use kube::{api::LogParams, Api}; use mirrord_config::agent::{AgentConfig, LinuxCapability}; -use mirrord_protocol::{AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV}; +use mirrord_protocol::{AGENT_METRICS_ENV, AGENT_NETWORK_INTERFACE_ENV, AGENT_OPERATOR_CERT_ENV}; use regex::Regex; use tracing::warn; @@ -59,6 +59,7 @@ pub(super) fn agent_env(agent: &AgentConfig, params: &&ContainerParams) -> Vec Vec