Skip to content

Commit

Permalink
FLEET-19 Move to gauges
Browse files Browse the repository at this point in the history
Ensures that gauges will survive hot reload
  • Loading branch information
jonathanrainer committed Nov 12, 2024
1 parent aaa789d commit e3e5c24
Showing 1 changed file with 95 additions and 63 deletions.
158 changes: 95 additions & 63 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ use std::time::Instant;
use opentelemetry::metrics::MeterProvider;
use opentelemetry_api::metrics::ObservableGauge;
use opentelemetry_api::metrics::Unit;
use opentelemetry_api::KeyValue;
use schemars::JsonSchema;
use serde::Deserialize;
use sysinfo::System;
use tokio::task::JoinHandle;
use tokio::sync::mpsc::Sender;
use tower::BoxError;
use tracing::debug;

use crate::metrics::meter_provider;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;

const REFRESH_INTERVAL: Duration = Duration::from_secs(60);

Expand Down Expand Up @@ -51,74 +52,113 @@ impl SystemGetter {
}
}

struct GaugeStore {
gauges: Vec<ObservableGauge<u64>>,
}

impl GaugeStore {
fn new() -> Self {
GaugeStore { gauges: Vec::new() }
}
fn initialize_gauges(&mut self, system_getter: Arc<Mutex<SystemGetter>>) {
let meter = meter_provider().meter("apollo/router");
{
let system_getter = system_getter.clone();
self.gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
})
.init(),
);
}
{
let system_getter = system_getter.clone();
self.gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_count")
.with_description(
"The number of CPUs reported by the instance the router is running on",
)
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpu_count = detect_cpu_count(system);
i.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())])
})
.init(),
);
}
{
let system_getter = system_getter.clone();
self.gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.total_memory")
.with_description(
"The amount of memory reported by the instance the router is running on",
)
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
i.observe(
system.total_memory(),
&[KeyValue::new("host.arch", get_otel_arch())],
)
})
.with_unit(Unit::new("bytes"))
.init(),
);
}
}
}

#[derive(Debug)]
struct FleetDetector {
handle: JoinHandle<()>,
#[allow(dead_code)]
// We have to store a reference to the gauge otherwise it will be dropped once the plugin is
// initialised, even though it still has data to emit
freq_gauge: ObservableGauge<u64>,
gauge_initializer: Sender<()>,
}

#[async_trait::async_trait]
impl Plugin for FleetDetector {
impl PluginPrivate for FleetDetector {
type Config = Conf;

async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
debug!("beginning environment detection, spawning gauges");
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
let meter = meter_provider().meter("apollo/router");

let gauge_local_system_getter = system_getter.clone();
let freq_gauge = meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
let mut system_getter = gauge_local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
})
.init();

debug!("establishing metrics emission task");
let counter_local_system_getter = system_getter.clone();
let handle = tokio::task::spawn(async move {
let mut interval = tokio::time::interval(REFRESH_INTERVAL);
loop {
interval.tick().await;
let mut system_getter = counter_local_system_getter.lock().unwrap();
let system = system_getter.get_system();
detect_cpu_values(system);
detect_memory_values(system);
debug!("initialising fleet detection plugin");
let (gauge_initializer, mut rx) = tokio::sync::mpsc::channel(1);

debug!("spawning gauge initializer task");
tokio::spawn(async move {
let mut gauge_store = GaugeStore::new();
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
while rx.recv().await.is_some() {
let system_getter = system_getter.clone();
gauge_store.initialize_gauges(system_getter);
}
});

Ok(FleetDetector { handle, freq_gauge })
Ok(FleetDetector { gauge_initializer })
}
}

impl Drop for FleetDetector {
fn drop(&mut self) {
self.handle.abort();
async fn activate(&self) {
debug!("initializing gauges");
if let Err(e) = self.gauge_initializer.send(()).await {
debug!("failed to activate fleet detector plugin: {:?}", e);
}
}
}

fn detect_cpu_values(system: &System) {
let cpu_count = detect_cpu_count(system);
u64_counter!(
"apollo.router.instance.cpu_count",
"The number of CPUs reported by the instance the router is running on",
cpu_count,
host.arch = get_otel_arch()
);
}

#[cfg(not(target_os = "linux"))]
fn detect_cpu_count(system: &System) -> u64 {
system.cpus().len() as u64
Expand Down Expand Up @@ -184,14 +224,6 @@ fn detect_cpu_count(system: &System) -> u64 {
}
}

fn detect_memory_values(system: &System) {
u64_counter!(
"apollo.router.instance.total_memory",
"The amount of memory reported by the instance the router is running on",
system.total_memory()
);
}

fn get_otel_arch() -> &'static str {
match ARCH {
"x86_64" => "amd64",
Expand All @@ -203,4 +235,4 @@ fn get_otel_arch() -> &'static str {
}
}

register_plugin!("apollo", "fleet_detector", FleetDetector);
register_private_plugin!("apollo", "fleet_detector", FleetDetector);

0 comments on commit e3e5c24

Please sign in to comment.