Skip to content

Commit

Permalink
FLEET-19 Use correct metrics form
Browse files Browse the repository at this point in the history
Also improve how we handle refreshing the System
object, rather than doing it in either the callback
or the async task, contain that within a struct
and do it there.
  • Loading branch information
jonathanrainer committed Nov 8, 2024
1 parent 8a4e66e commit 51cc5ae
Showing 1 changed file with 99 additions and 19 deletions.
118 changes: 99 additions & 19 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,105 @@
use std::env::consts::ARCH;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
use std::time::Instant;

use opentelemetry::metrics::MeterProvider;
use opentelemetry_api::metrics::ObservableGauge;
use opentelemetry_api::metrics::Unit;
use schemars::JsonSchema;
use serde::Deserialize;
use sysinfo::System;
use tokio::task::JoinHandle;
use tower::BoxError;
use tracing::debug;
use tracing::info;

use crate::metrics::meter_provider;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;

const REFRESH_INTERVAL: Duration = Duration::from_secs(60);

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Conf {}

#[derive(Debug)]
struct SystemGetter {
system: System,
start: Instant,
}

impl SystemGetter {
fn new() -> Self {
let mut system = System::new();
system.refresh_all();
Self {
system,
start: Instant::now(),
}
}

fn get_system(&mut self) -> &System {
if self.start.elapsed() < REFRESH_INTERVAL {
&self.system
} else {
self.start = Instant::now();
self.system.refresh_cpu_all();
self.system.refresh_memory();
&self.system
}
}
}

#[derive(Debug)]
struct FleetDetector {
handle: JoinHandle<()>,
#[allow(dead_code)]
// We have to store a reference to the gauge otherwise it will be dropped once the plugin is
// initialised, even though it still has data to emit
freq_gauge: ObservableGauge<u64>,
}

#[derive(Debug, Default, Deserialize, JsonSchema)]
struct Conf {}

#[async_trait::async_trait]
impl Plugin for FleetDetector {
type Config = Conf;

async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
debug!("beginning environment detection");
debug!("spawning continuous detector task");
let handle = tokio::task::spawn(async {
let mut sys = System::new_all();
let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
debug!("beginning environment detection, spawning gauges");
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
let meter = meter_provider().meter("apollo/router");

let gauge_local_system_getter = system_getter.clone();
let freq_gauge = meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
let mut system_getter = gauge_local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
})
.init();

debug!("establishing metrics emission task");
let counter_local_system_getter = system_getter.clone();
let handle = tokio::task::spawn(async move {
let mut interval = tokio::time::interval(REFRESH_INTERVAL);
loop {
interval.tick().await;
sys.refresh_cpu_all();
sys.refresh_memory();
detect_cpu_values(&sys);
detect_memory_values(&sys);
let mut system_getter = counter_local_system_getter.lock().unwrap();
let system = system_getter.get_system();
detect_cpu_values(system);
detect_memory_values(system);
}
});

Ok(FleetDetector { handle })
Ok(FleetDetector { handle, freq_gauge })
}
}

Expand All @@ -47,11 +110,13 @@ impl Drop for FleetDetector {
}

fn detect_cpu_values(system: &System) {
let cpus = system.cpus();
let cpu_count = detect_cpu_count(system);
let cpu_freq = cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
info!(value.apollo.router.instance.cpu_freq = cpu_freq);
info!(counter.apollo.router.instance.cpu_count = cpu_count);
u64_counter!(
"apollo.router.instance.cpu_count",
"The number of CPUs reported by the instance the router is running on",
cpu_count,
host.arch = get_otel_arch()
);
}

#[cfg(not(target_os = "linux"))]
Expand Down Expand Up @@ -120,7 +185,22 @@ fn detect_cpu_count(system: &System) -> u64 {
}

fn detect_memory_values(system: &System) {
info!(counter.apollo.router.instance.total_memory = system.total_memory())
u64_counter!(
"apollo.router.instance.total_memory",
"The amount of memory reported by the instance the router is running on",
system.total_memory()
);
}

fn get_otel_arch() -> &'static str {
match ARCH {
"x86_64" => "amd64",
"aarch64" => "arm64",
"arm" => "arm32",
"powerpc" => "ppc32",
"powerpc64" => "ppc64",
a => a,
}
}

register_plugin!("apollo", "fleet_detector", FleetDetector);

0 comments on commit 51cc5ae

Please sign in to comment.