Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
Gil Mizrahi committed Oct 26, 2023
1 parent d029d1a commit 22e31f2
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 11 deletions.
29 changes: 20 additions & 9 deletions rust-connector-sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
check_health,
connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError},
json_response::JsonResponse,
routes,
metrics, routes,
tracing::{init_tracing, make_span, on_response},
};

Expand Down Expand Up @@ -130,7 +130,8 @@ type Port = u16;
pub struct ServerState<C: Connector> {
configuration: C::Configuration,
state: C::State,
metrics: Registry,
metrics_registry: Registry,
metrics: metrics::Metrics,
}

/// A default main function for a connector.
Expand Down Expand Up @@ -255,14 +256,16 @@ where
.await
.unwrap();

let mut metrics = Registry::new();
let state = C::try_init_state(&configuration, &mut metrics)
let mut metrics_registry = Registry::new();
let metrics = metrics::Metrics::initialize(&mut metrics_registry).unwrap(); // todo: remove unwrap
let state = C::try_init_state(&configuration, &mut metrics_registry)
.await
.unwrap();

ServerState::<C> {
configuration,
state,
metrics_registry,
metrics,
}
}
Expand Down Expand Up @@ -384,7 +387,7 @@ where
async fn get_metrics<C: Connector>(
State(state): State<ServerState<C>>,
) -> Result<String, (StatusCode, Json<ErrorResponse>)> {
routes::get_metrics::<C>(&state.configuration, &state.state, state.metrics)
routes::get_metrics::<C>(&state.configuration, &state.state, state.metrics_registry)
}

async fn get_capabilities<C: Connector>() -> JsonResponse<CapabilitiesResponse> {
Expand All @@ -400,28 +403,36 @@ async fn get_health<C: Connector>(
async fn get_schema<C: Connector>(
State(state): State<ServerState<C>>,
) -> Result<JsonResponse<SchemaResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::get_schema::<C>(&state.configuration).await
state
.metrics
.record_status(routes::get_schema::<C>(&state.configuration).await)
}

async fn post_explain<C: Connector>(
State(state): State<ServerState<C>>,
request: Json<QueryRequest>,
) -> Result<JsonResponse<ExplainResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::post_explain::<C>(&state.configuration, &state.state, request).await
state
.metrics
.record_status(routes::post_explain::<C>(&state.configuration, &state.state, request).await)
}

async fn post_mutation<C: Connector>(
State(state): State<ServerState<C>>,
request: Json<MutationRequest>,
) -> Result<JsonResponse<MutationResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::post_mutation::<C>(&state.configuration, &state.state, request).await
state.metrics.record_status(
routes::post_mutation::<C>(&state.configuration, &state.state, request).await,
)
}

async fn post_query<C: Connector>(
State(state): State<ServerState<C>>,
request: Json<QueryRequest>,
) -> Result<JsonResponse<QueryResponse>, (StatusCode, Json<ErrorResponse>)> {
routes::post_query::<C>(&state.configuration, &state.state, request).await
state
.metrics
.record_status(routes::post_query::<C>(&state.configuration, &state.state, request).await)
}

async fn configuration<C: Connector + 'static>(
Expand Down
1 change: 1 addition & 0 deletions rust-connector-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod check_health;
pub mod connector;
pub mod default_main;
pub mod json_response;
pub mod metrics;
pub mod routes;
pub mod secret;
pub mod tracing;
Expand Down
145 changes: 145 additions & 0 deletions rust-connector-sdk/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
//! Some metrics setup and update.
use crate::json_response::JsonResponse;
use axum::{http::StatusCode, Json};
use ndc_client::models::ErrorResponse;
use prometheus;

/// The collection of some metrics exposed through the `/metrics` endpoint.
#[derive(Debug, Clone)]
pub struct Metrics {
status_codes: StatusCodeMetrics,
}

/// The collection of some metrics exposed through the `/metrics` endpoint.
#[derive(Debug, Clone)]
pub struct StatusCodeMetrics {
total_200: prometheus::IntCounter,
total_400: prometheus::IntCounter,
total_403: prometheus::IntCounter,
total_409: prometheus::IntCounter,
total_500: prometheus::IntCounter,
total_501: prometheus::IntCounter,
}

impl Metrics {
/// Set up counters and gauges used to produce Prometheus metrics
pub fn initialize(
metrics_registry: &mut prometheus::Registry,
) -> Result<Self, prometheus::Error> {
let total_200 = add_int_counter_metric(
metrics_registry,
"status_code_200",
"Total number of 200 status codes returned.",
)?;

let total_400 = add_int_counter_metric(
metrics_registry,
"status_code_400",
"Total number of 400 status codes returned.",
)?;

let total_403 = add_int_counter_metric(
metrics_registry,
"status_code_403",
"Total number of 403 status codes returned.",
)?;

let total_409 = add_int_counter_metric(
metrics_registry,
"status_code_409",
"Total number of 409 status codes returned.",
)?;

let total_500 = add_int_counter_metric(
metrics_registry,
"status_code_500",
"Total number of 500 status codes returned.",
)?;

let total_501 = add_int_counter_metric(
metrics_registry,
"status_code_501",
"Total number of 501 status codes returned.",
)?;

let status_codes = StatusCodeMetrics {
total_200,
total_400,
total_403,
total_409,
total_500,
total_501,
};

Ok(Self { status_codes })
}

/// record a status code from an api result.
pub fn record_status<T>(
&self,
result: Result<JsonResponse<T>, (StatusCode, Json<ErrorResponse>)>,
) -> Result<JsonResponse<T>, (StatusCode, Json<ErrorResponse>)> {
match result {
Ok(result) => {
self.record_status_code(StatusCode::OK);
Ok(result)
}
Err((status_code, result)) => {
self.record_status_code(status_code);
Err((status_code, result))
}
}
}

fn record_status_code(&self, status_code: StatusCode) {
match status_code {
StatusCode::OK => self.record_200(),
StatusCode::BAD_REQUEST => self.record_400(),
StatusCode::FORBIDDEN => self.record_403(),
StatusCode::CONFLICT => self.record_409(),
StatusCode::INTERNAL_SERVER_ERROR => self.record_500(),
StatusCode::NOT_IMPLEMENTED => self.record_501(),
_ => (),
}
}

fn record_200(&self) {
self.status_codes.total_200.inc()
}
fn record_400(&self) {
self.status_codes.total_400.inc()
}
fn record_403(&self) {
self.status_codes.total_403.inc()
}
fn record_409(&self) {
self.status_codes.total_409.inc()
}
fn record_500(&self) {
self.status_codes.total_500.inc()
}
fn record_501(&self) {
self.status_codes.total_501.inc()
}
}

/// Create a new int counter metric and register it with the provided Prometheus Registry
fn add_int_counter_metric(
metrics_registry: &mut prometheus::Registry,
metric_name: &str,
metric_description: &str,
) -> Result<prometheus::IntCounter, prometheus::Error> {
let int_counter =
prometheus::IntCounter::with_opts(prometheus::Opts::new(metric_name, metric_description))?;
register_collector(metrics_registry, int_counter)
}

/// Register a new collector with the registry, and returns it for later use.
fn register_collector<Collector: prometheus::core::Collector + std::clone::Clone + 'static>(
metrics_registry: &mut prometheus::Registry,
collector: Collector,
) -> Result<Collector, prometheus::Error> {
metrics_registry.register(Box::new(collector.clone()))?;
Ok(collector)
}
4 changes: 2 additions & 2 deletions rust-connector-sdk/src/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
pub fn get_metrics<C: Connector>(
configuration: &C::Configuration,
state: &C::State,
metrics: Registry,
metrics_registry: Registry,
) -> Result<String, (StatusCode, Json<models::ErrorResponse>)> {
let encoder = TextEncoder::new();

Expand All @@ -25,7 +25,7 @@ pub fn get_metrics<C: Connector>(
)
})?;

let metric_families = metrics.gather();
let metric_families = metrics_registry.gather();

encoder.encode_to_string(&metric_families).map_err(|_| {
(
Expand Down

0 comments on commit 22e31f2

Please sign in to comment.