Skip to content

Commit

Permalink
FLEET-19 Add GaugeStore so as not to lose the Gauges
Browse files Browse the repository at this point in the history
  • Loading branch information
jonathanrainer committed Nov 7, 2024
1 parent d939c33 commit e47a7a1
Showing 1 changed file with 75 additions and 70 deletions.
145 changes: 75 additions & 70 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,9 @@ use opentelemetry_api::KeyValue;
use schemars::JsonSchema;
use serde::Deserialize;
use sysinfo::System;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::mpsc::Sender;
use tower::BoxError;
use tracing::debug;
use tracing::warn;

use crate::metrics::meter_provider;
use crate::plugin::PluginInit;
Expand Down Expand Up @@ -54,6 +52,79 @@ impl SystemGetter {
}
}

struct GaugeStore {
gauges: Vec<ObservableGauge<u64>>,
}

impl GaugeStore {
fn new() -> Self {
GaugeStore { gauges: Vec::new() }
}
fn initialize_gauges(&mut self, system_getter: Arc<Mutex<SystemGetter>>) {
let meter = meter_provider().meter("apollo/router");
{
let system_getter = system_getter.clone();
self.gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
})
.init(),
);
}
{
let system_getter = system_getter.clone();
self.gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_count")
.with_description(
"The number of CPUs reported by the instance the router is running on",
)
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpu_count = detect_cpu_count(system);
i.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())])
})
.init(),
);
}
{
let system_getter = system_getter.clone();
self.gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.total_memory")
.with_description(
"The amount of memory reported by the instance the router is running on",
)
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
i.observe(
system.total_memory(),
&[KeyValue::new("host.arch", get_otel_arch())],
)
})
.with_unit(Unit::new("bytes"))
.init(),
);
}
}
}

#[derive(Debug)]
struct FleetDetector {
#[allow(dead_code)]
Expand All @@ -69,11 +140,11 @@ impl PluginPrivate for FleetDetector {

debug!("spawning gauge initializer task");
tokio::spawn(async move {
let mut gauges = vec![];
let mut gauge_store = GaugeStore::new();
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
while let Some(_) = rx.recv().await {
let system_getter = system_getter.clone();
gauges = initialize_gauges(system_getter);
gauge_store.initialize_gauges(system_getter);
}
});

Expand Down Expand Up @@ -164,70 +235,4 @@ fn get_otel_arch() -> &'static str {
}
}

fn initialize_gauges(system_getter: Arc<Mutex<SystemGetter>>) -> Vec<ObservableGauge<u64>> {
let mut gauges = Vec::new();
let meter = meter_provider().meter("apollo/router");
{
let system_getter = system_getter.clone();
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
"The CPU frequency of the underlying instance the router is deployed to",
)
.with_unit(Unit::new("Mhz"))
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpus = system.cpus();
let cpu_freq =
cpus.iter().map(|cpu| cpu.frequency()).sum::<u64>() / cpus.len() as u64;
i.observe(cpu_freq, &[])
})
.init(),
);
}
{
let system_getter = system_getter.clone();
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_count")
.with_description(
"The number of CPUs reported by the instance the router is running on",
)
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
let cpu_count = detect_cpu_count(system);
i.observe(cpu_count, &[KeyValue::new("host.arch", get_otel_arch())])
})
.init(),
);
}
{
let system_getter = system_getter.clone();
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.total_memory")
.with_description(
"The amount of memory reported by the instance the router is running on",
)
.with_callback(move |i| {
let local_system_getter = system_getter.clone();
let mut system_getter = local_system_getter.lock().unwrap();
let system = system_getter.get_system();
i.observe(
system.total_memory(),
&[KeyValue::new("host.arch", get_otel_arch())],
)
})
.with_unit(Unit::new("bytes"))
.init(),
);
}
gauges
}

register_private_plugin!("apollo", "fleet_detector", FleetDetector);

0 comments on commit e47a7a1

Please sign in to comment.