Skip to content

Commit

Permalink
This commit docs a few things
Browse files Browse the repository at this point in the history
1. activate is made non-async. It must never fail and it must complete. async fns can halt execution before completion.
2. The spawned thread and channel for fleet detector is removed. There's no need for these. Gauges will be called periodically for export.
3. Telemetry is converted to PrivatePlugin to allow uniform calls to activate.
  • Loading branch information
bryn committed Nov 15, 2024
1 parent c1ec7c7 commit 1730506
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 58 deletions.
23 changes: 17 additions & 6 deletions apollo-router/src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,8 +631,8 @@ pub(crate) trait PluginPrivate: Send + Sync + 'static {
MultiMap::new()
}

/// This is invoked once after the OTEL meter has been refreshed.
async fn activate(&self) {}
/// The point of no return this plugin is about to go live
fn activate(&self) {}
}

#[async_trait]
Expand Down Expand Up @@ -680,6 +680,8 @@ where
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
PluginUnstable::web_endpoints(self)
}

fn activate(&self) {}
}

fn get_type_of<T>(_: &T) -> &'static str {
Expand Down Expand Up @@ -737,8 +739,8 @@ pub(crate) trait DynPlugin: Send + Sync + 'static {
#[cfg(test)]
fn as_any_mut(&mut self) -> &mut dyn std::any::Any;

/// This is invoked once after the OTEL meter has been refreshed.
async fn activate(&self) {}
/// The point of no return, this plugin is about to go live
fn activate(&self) {}
}

#[async_trait]
Expand Down Expand Up @@ -790,8 +792,17 @@ where
self
}

async fn activate(&self) {
self.activate().await
fn activate(&self) {
self.activate()
}
}

impl<T> From<T> for Box<dyn DynPlugin>
where
T: PluginPrivate,
{
fn from(value: T) -> Self {
Box::new(value)
}
}

Expand Down
57 changes: 23 additions & 34 deletions apollo-router/src/plugins/fleet_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use opentelemetry_api::KeyValue;
use schemars::JsonSchema;
use serde::Deserialize;
use sysinfo::System;
use tokio::sync::mpsc::Sender;
use tower::BoxError;
use tracing::debug;

Expand Down Expand Up @@ -54,19 +53,22 @@ impl SystemGetter {
}
}

struct GaugeStore {
gauges: Vec<ObservableGauge<u64>>,
#[derive(Default)]
enum GaugeStore {
#[default]
Disabled,
Pending,
Active(Vec<ObservableGauge<u64>>),
}

impl GaugeStore {
fn new() -> Self {
GaugeStore { gauges: Vec::new() }
}
fn initialize_gauges(&mut self, system_getter: Arc<Mutex<SystemGetter>>) {
fn active() -> GaugeStore {
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
let meter = meter_provider().meter("apollo/router");
let mut gauges = Vec::new();
{
let system_getter = system_getter.clone();
self.gauges.push(
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_freq")
.with_description(
Expand All @@ -87,7 +89,7 @@ impl GaugeStore {
}
{
let system_getter = system_getter.clone();
self.gauges.push(
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.cpu_count")
.with_description(
Expand All @@ -105,7 +107,7 @@ impl GaugeStore {
}
{
let system_getter = system_getter.clone();
self.gauges.push(
gauges.push(
meter
.u64_observable_gauge("apollo.router.instance.total_memory")
.with_description(
Expand All @@ -124,47 +126,34 @@ impl GaugeStore {
.init(),
);
}
GaugeStore::Active(gauges)
}
}

#[derive(Debug)]
#[derive(Default)]
struct FleetDetector {
#[allow(dead_code)]
gauge_initializer: Sender<()>,
gauge_store: Mutex<GaugeStore>,
}
#[async_trait::async_trait]
impl PluginPrivate for FleetDetector {
type Config = Conf;

async fn new(_: PluginInit<Self::Config>) -> Result<Self, BoxError> {
debug!("initialising fleet detection plugin");
let (gauge_initializer, mut rx) = tokio::sync::mpsc::channel(1);

if env::var(APOLLO_TELEMETRY_DISABLED).is_ok() {
debug!("fleet detection disabled, no telemetry will be sent");
rx.close();
return Ok(FleetDetector { gauge_initializer });
return Ok(FleetDetector::default());
}

debug!("spawning gauge initializer task");
tokio::spawn(async move {
let mut gauge_store = GaugeStore::new();
let system_getter = Arc::new(Mutex::new(SystemGetter::new()));
while rx.recv().await.is_some() {
let system_getter = system_getter.clone();
gauge_store.initialize_gauges(system_getter);
}
});

Ok(FleetDetector { gauge_initializer })
Ok(FleetDetector {
gauge_store: Mutex::new(GaugeStore::Pending),
})
}

async fn activate(&self) {
if !self.gauge_initializer.is_closed() {
debug!("initializing gauges");
if let Err(e) = self.gauge_initializer.send(()).await {
debug!("failed to activate fleet detector plugin: {:?}", e);
}
fn activate(&self) {
let mut store = self.gauge_store.lock().expect("lock poisoned");
if matches!(*store, GaugeStore::Pending) {
*store = GaugeStore::active();
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/plugins/telemetry/metrics/apollo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ mod test {
use super::studio::SingleStatsReport;
use super::*;
use crate::context::OPERATION_KIND;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::subscription;
use crate::plugins::telemetry::apollo;
use crate::plugins::telemetry::apollo::default_buffer_size;
Expand Down Expand Up @@ -364,7 +364,7 @@ mod test {
request_builder.header("accept", "multipart/mixed;subscriptionSpec=1.0");
}
TestHarness::builder()
.extra_plugin(plugin)
.extra_private_plugin(plugin)
.extra_plugin(create_subscription_plugin().await?)
.build_router()
.await?
Expand Down
13 changes: 6 additions & 7 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ use crate::layers::ServiceBuilderExt;
use crate::metrics::aggregation::MeterProviderType;
use crate::metrics::filter::FilterMeterProvider;
use crate::metrics::meter_provider;
use crate::plugin::Plugin;
use crate::plugin::PluginInit;
use crate::plugin::PluginPrivate;
use crate::plugins::telemetry::apollo::ForwardHeaders;
use crate::plugins::telemetry::apollo_exporter::proto::reports::trace::node::Id::ResponseName;
use crate::plugins::telemetry::apollo_exporter::proto::reports::StatsContext;
Expand Down Expand Up @@ -133,7 +133,6 @@ use crate::plugins::telemetry::tracing::apollo_telemetry::APOLLO_PRIVATE_OPERATI
use crate::plugins::telemetry::tracing::TracingConfigurator;
use crate::plugins::telemetry::utils::TracingUtils;
use crate::query_planner::OperationKind;
use crate::register_plugin;
use crate::router_factory::Endpoint;
use crate::services::execution;
use crate::services::router;
Expand Down Expand Up @@ -280,7 +279,7 @@ fn create_builtin_instruments(config: &InstrumentsConfig) -> BuiltinInstruments
}

#[async_trait::async_trait]
impl Plugin for Telemetry {
impl PluginPrivate for Telemetry {
type Config = config::Conf;

async fn new(init: PluginInit<Self::Config>) -> Result<Self, BoxError> {
Expand Down Expand Up @@ -853,10 +852,8 @@ impl Plugin for Telemetry {
fn web_endpoints(&self) -> MultiMap<ListenAddr, Endpoint> {
self.custom_endpoints.clone()
}
}

impl Telemetry {
pub(crate) fn activate(&self) {
fn activate(&self) {
let mut activation = self.activation.lock();
if activation.is_active {
return;
Expand Down Expand Up @@ -910,7 +907,9 @@ impl Telemetry {
reload_fmt(create_fmt_layer(&self.config));
activation.is_active = true;
}
}

impl Telemetry {
fn create_propagator(config: &config::Conf) -> TextMapCompositePropagator {
let propagation = &config.exporters.tracing.propagation;

Expand Down Expand Up @@ -1979,7 +1978,7 @@ fn handle_error_internal<T: Into<opentelemetry::global::Error>>(
}
}

register_plugin!("apollo", "telemetry", Telemetry);
register_private_plugin!("apollo", "telemetry", Telemetry);

fn request_ftv1(mut req: SubgraphRequest) -> SubgraphRequest {
if req
Expand Down
4 changes: 2 additions & 2 deletions apollo-router/src/plugins/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,12 @@ use crate::Notify;
/// You can pass in a configuration and a schema to the test harness. If you pass in a schema, the test harness will create a query planner and use the schema to extract subgraph schemas.
///
///
pub(crate) struct PluginTestHarness<T: Plugin> {
pub(crate) struct PluginTestHarness<T: Into<Box<dyn DynPlugin>>> {
plugin: Box<dyn DynPlugin>,
phantom: std::marker::PhantomData<T>,
}
#[buildstructor::buildstructor]
impl<T: Plugin> PluginTestHarness<T> {
impl<T: Into<Box<dyn DynPlugin + 'static>> + 'static> PluginTestHarness<T> {
#[builder]
pub(crate) async fn new<'a, 'b>(config: Option<&'a str>, schema: Option<&'b str>) -> Self {
let factory = crate::plugin::plugins()
Expand Down
7 changes: 1 addition & 6 deletions apollo-router/src/services/supergraph/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::plugins::telemetry::config_new::events::log_event;
use crate::plugins::telemetry::config_new::events::SupergraphEventResponse;
use crate::plugins::telemetry::consts::QUERY_PLANNING_SPAN_NAME;
use crate::plugins::telemetry::tracing::apollo_telemetry::APOLLO_PRIVATE_DURATION_NS;
use crate::plugins::telemetry::Telemetry;
use crate::plugins::telemetry::LOGGING_DISPLAY_BODY;
use crate::plugins::traffic_shaping::TrafficShaping;
use crate::plugins::traffic_shaping::APOLLO_TRAFFIC_SHAPING;
Expand Down Expand Up @@ -810,11 +809,7 @@ impl PluggableSupergraphServiceBuilder {
// Activate the telemetry plugin.
// We must NOT fail to go live with the new router from this point as the telemetry plugin activate interacts with globals.
for (_, plugin) in self.plugins.iter() {
if let Some(telemetry) = plugin.as_any().downcast_ref::<Telemetry>() {
telemetry.activate();
} else {
plugin.activate().await;
}
plugin.activate();
}

// We need a non-fallible hook so that once we know we are going live with a pipeline we do final initialization.
Expand Down
2 changes: 1 addition & 1 deletion apollo-router/src/test_harness.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl<'a> TestHarness<'a> {
),
};

self.extra_plugins.push((name, Box::new(plugin)));
self.extra_plugins.push((name, plugin.into()));
self
}

Expand Down

0 comments on commit 1730506

Please sign in to comment.