From e3e5c2403205b6c56d0cef1b30e30ced7e01c6d4 Mon Sep 17 00:00:00 2001 From: jonathanrainer Date: Thu, 7 Nov 2024 13:39:20 +0000 Subject: [PATCH] FLEET-19 Move to gauges Ensures that gauges will survive hot reload --- apollo-router/src/plugins/fleet_detector.rs | 158 ++++++++++++-------- 1 file changed, 95 insertions(+), 63 deletions(-) diff --git a/apollo-router/src/plugins/fleet_detector.rs b/apollo-router/src/plugins/fleet_detector.rs index 55ab128bae..011ed306ff 100644 --- a/apollo-router/src/plugins/fleet_detector.rs +++ b/apollo-router/src/plugins/fleet_detector.rs @@ -7,16 +7,17 @@ use std::time::Instant; use opentelemetry::metrics::MeterProvider; use opentelemetry_api::metrics::ObservableGauge; use opentelemetry_api::metrics::Unit; +use opentelemetry_api::KeyValue; use schemars::JsonSchema; use serde::Deserialize; use sysinfo::System; -use tokio::task::JoinHandle; +use tokio::sync::mpsc::Sender; use tower::BoxError; use tracing::debug; use crate::metrics::meter_provider; -use crate::plugin::Plugin; use crate::plugin::PluginInit; +use crate::plugin::PluginPrivate; const REFRESH_INTERVAL: Duration = Duration::from_secs(60); @@ -51,74 +52,113 @@ impl SystemGetter { } } +struct GaugeStore { + gauges: Vec>, +} + +impl GaugeStore { + fn new() -> Self { + GaugeStore { gauges: Vec::new() } + } + fn initialize_gauges(&mut self, system_getter: Arc>) { + let meter = meter_provider().meter("apollo/router"); + { + let system_getter = system_getter.clone(); + self.gauges.push( + meter + .u64_observable_gauge("apollo.router.instance.cpu_freq") + .with_description( + "The CPU frequency of the underlying instance the router is deployed to", + ) + .with_unit(Unit::new("Mhz")) + .with_callback(move |i| { + let local_system_getter = system_getter.clone(); + let mut system_getter = local_system_getter.lock().unwrap(); + let system = system_getter.get_system(); + let cpus = system.cpus(); + let cpu_freq = + cpus.iter().map(|cpu| cpu.frequency()).sum::() / cpus.len() as u64; + i.observe(cpu_freq, &[]) + }) + .init(), + ); + } + { + let system_getter = system_getter.clone(); + self.gauges.push( + meter + .u64_observable_gauge("apollo.router.instance.cpu_count") + .with_description( + "The number of CPUs reported by the instance the router is running on", + ) + .with_callback(move |i| { + let local_system_getter = system_getter.clone(); + let mut system_getter = local_system_getter.lock().unwrap(); + let system = system_getter.get_system(); + let cpu_count = detect_cpu_count(system); + i.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())]) + }) + .init(), + ); + } + { + let system_getter = system_getter.clone(); + self.gauges.push( + meter + .u64_observable_gauge("apollo.router.instance.total_memory") + .with_description( + "The amount of memory reported by the instance the router is running on", + ) + .with_callback(move |i| { + let local_system_getter = system_getter.clone(); + let mut system_getter = local_system_getter.lock().unwrap(); + let system = system_getter.get_system(); + i.observe( + system.total_memory(), + &[KeyValue::new("host.arch", get_otel_arch())], + ) + }) + .with_unit(Unit::new("bytes")) + .init(), + ); + } + } +} + #[derive(Debug)] struct FleetDetector { - handle: JoinHandle<()>, #[allow(dead_code)] - // We have to store a reference to the gauge otherwise it will be dropped once the plugin is - // initialised, even though it still has data to emit - freq_gauge: ObservableGauge, + gauge_initializer: Sender<()>, } - #[async_trait::async_trait] -impl Plugin for FleetDetector { +impl PluginPrivate for FleetDetector { type Config = Conf; async fn new(_: PluginInit) -> Result { - debug!("beginning environment detection, spawning gauges"); - let system_getter = Arc::new(Mutex::new(SystemGetter::new())); - let meter = meter_provider().meter("apollo/router"); - - let gauge_local_system_getter = system_getter.clone(); - let freq_gauge = meter - .u64_observable_gauge("apollo.router.instance.cpu_freq") - .with_description( - "The CPU frequency of the underlying instance the router is deployed to", - ) - .with_unit(Unit::new("Mhz")) - .with_callback(move |i| { - let mut system_getter = gauge_local_system_getter.lock().unwrap(); - let system = system_getter.get_system(); - let cpus = system.cpus(); - let cpu_freq = - cpus.iter().map(|cpu| cpu.frequency()).sum::() / cpus.len() as u64; - i.observe(cpu_freq, &[]) - }) - .init(); - - debug!("establishing metrics emission task"); - let counter_local_system_getter = system_getter.clone(); - let handle = tokio::task::spawn(async move { - let mut interval = tokio::time::interval(REFRESH_INTERVAL); - loop { - interval.tick().await; - let mut system_getter = counter_local_system_getter.lock().unwrap(); - let system = system_getter.get_system(); - detect_cpu_values(system); - detect_memory_values(system); + debug!("initialising fleet detection plugin"); + let (gauge_initializer, mut rx) = tokio::sync::mpsc::channel(1); + + debug!("spawning gauge initializer task"); + tokio::spawn(async move { + let mut gauge_store = GaugeStore::new(); + let system_getter = Arc::new(Mutex::new(SystemGetter::new())); + while rx.recv().await.is_some() { + let system_getter = system_getter.clone(); + gauge_store.initialize_gauges(system_getter); } }); - Ok(FleetDetector { handle, freq_gauge }) + Ok(FleetDetector { gauge_initializer }) } -} -impl Drop for FleetDetector { - fn drop(&mut self) { - self.handle.abort(); + async fn activate(&self) { + debug!("initializing gauges"); + if let Err(e) = self.gauge_initializer.send(()).await { + debug!("failed to activate fleet detector plugin: {:?}", e); + } } } -fn detect_cpu_values(system: &System) { - let cpu_count = detect_cpu_count(system); - u64_counter!( - "apollo.router.instance.cpu_count", - "The number of CPUs reported by the instance the router is running on", - cpu_count, - host.arch = get_otel_arch() - ); -} - #[cfg(not(target_os = "linux"))] fn detect_cpu_count(system: &System) -> u64 { system.cpus().len() as u64 @@ -184,14 +224,6 @@ fn detect_cpu_count(system: &System) -> u64 { } } -fn detect_memory_values(system: &System) { - u64_counter!( - "apollo.router.instance.total_memory", - "The amount of memory reported by the instance the router is running on", - system.total_memory() - ); -} - fn get_otel_arch() -> &'static str { match ARCH { "x86_64" => "amd64", @@ -203,4 +235,4 @@ fn get_otel_arch() -> &'static str { } } -register_plugin!("apollo", "fleet_detector", FleetDetector); +register_private_plugin!("apollo", "fleet_detector", FleetDetector);