diff --git a/Cargo.lock b/Cargo.lock index 30dd2087..11b161b7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1010,10 +1010,12 @@ dependencies = [ "axum-macros", "clap", "gdc_rust_types", + "http", "indexmap 1.9.3", "ndc-client", "ndc-test", "opentelemetry", + "opentelemetry-http", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_api", diff --git a/justfile b/justfile new file mode 100644 index 00000000..cdd09d29 --- /dev/null +++ b/justfile @@ -0,0 +1,23 @@ +# re-build on code changes, and run the reference agent each time a build is +# successful +dev: + cargo watch \ + -x test \ + -x 'run --bin ndc_hub_example \ + -- serve --configuration <(echo 'null') \ + --otlp-endpoint http://localhost:4317' + +# reformat everything +format: + cargo fmt --all + +# is everything formatted? +format-check: + cargo fmt --all --check + +# run `clippy` linter +lint *FLAGS: + cargo clippy {{FLAGS}} + +lint-apply *FLAGS: + cargo clippy --fix {{FLAGS}} diff --git a/rust-connector-sdk/Cargo.toml b/rust-connector-sdk/Cargo.toml index 7aa786d8..3851c2a5 100644 --- a/rust-connector-sdk/Cargo.toml +++ b/rust-connector-sdk/Cargo.toml @@ -18,13 +18,15 @@ axum-macros = "^0.3.7" clap = { version = "^4.3.9", features = ["derive", "env"] } ndc-client = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.6" } ndc-test = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.6" } +http = "^0.2" opentelemetry = { version = "^0.20", features = [ "rt-tokio", "trace", ], default-features = false } opentelemetry_api = "^0.20.0" -opentelemetry_sdk = "^0.20.0" +opentelemetry-http = "0.9.0" opentelemetry-otlp = { version = "^0.13.0", features = ["reqwest-client"] } +opentelemetry_sdk = "^0.20.0" opentelemetry-semantic-conventions = "^0.12.0" prometheus = "^0.13.3" reqwest = "^0.11.20" diff --git a/rust-connector-sdk/src/default_main.rs b/rust-connector-sdk/src/default_main.rs index d21f4c77..96e97e89 100644 --- a/rust-connector-sdk/src/default_main.rs +++ b/rust-connector-sdk/src/default_main.rs @@ -4,6 +4,7 @@ use crate::{ check_health, connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError}, routes, + tracing::{init_tracing, make_span, on_response}, }; use async_trait::async_trait; @@ -23,22 +24,13 @@ use ndc_client::models::{ QueryRequest, QueryResponse, SchemaResponse, }; use ndc_test::report; -use opentelemetry::{global, sdk::propagation::TraceContextPropagator}; -use opentelemetry_api::KeyValue; -use opentelemetry_otlp::{WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT}; -use opentelemetry_sdk::trace::Sampler; use prometheus::Registry; use schemars::{schema::RootSchema, JsonSchema}; use serde::{de::DeserializeOwned, Serialize}; use std::error::Error; use std::net; -use std::{env, process::exit}; -use tower_http::{ - cors::CorsLayer, - trace::{DefaultMakeSpan, TraceLayer}, -}; -use tracing::Level; -use tracing_subscriber::{prelude::*, EnvFilter}; +use std::process::exit; +use tower_http::{cors::CorsLayer, trace::TraceLayer}; use self::v2_compat::SourceConfig; @@ -162,57 +154,6 @@ where } } -fn init_tracing(serve_command: &ServeCommand) -> Result<(), Box> { - global::set_text_map_propagator(TraceContextPropagator::new()); - - let service_name = serve_command - .service_name - .clone() - .unwrap_or(env!("CARGO_PKG_NAME").to_string()); - - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter( - opentelemetry_otlp::new_exporter().tonic().with_endpoint( - serve_command - .otlp_endpoint - .clone() - .unwrap_or(OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT.into()), - ), - ) - .with_trace_config( - opentelemetry::sdk::trace::config() - .with_resource(opentelemetry::sdk::Resource::new(vec![ - KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_NAME, - service_name, - ), - KeyValue::new( - opentelemetry_semantic_conventions::resource::SERVICE_VERSION, - env!("CARGO_PKG_VERSION"), - ), - ])) - .with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))), - ) - .install_batch(opentelemetry::runtime::Tokio)?; - - tracing_subscriber::registry() - .with( - tracing_opentelemetry::layer() - .with_exception_field_propagation(true) - .with_tracer(tracer), - ) - .with(EnvFilter::builder().parse("info,otel::tracing=trace,otel=debug")?) - .with( - tracing_subscriber::fmt::layer() - .json() - .with_timer(tracing_subscriber::fmt::time::time()), - ) - .init(); - - Ok(()) -} - async fn serve( serve_command: ServeCommand, ) -> Result<(), Box> @@ -221,7 +162,8 @@ where C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone, C::State: Sync + Send + Clone, { - init_tracing(&serve_command).expect("Unable to initialize tracing"); + init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint) + .expect("Unable to initialize tracing"); let server_state = init_server_state::(serve_command.configuration).await; @@ -333,7 +275,8 @@ where router .layer( TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::default().level(Level::INFO)), + .make_span_with(make_span) + .on_response(on_response), ) .layer(ValidateRequestHeaderLayer::custom( move |request: &mut Request| { @@ -376,7 +319,8 @@ where .route("/explain", post(v2_compat::post_explain::)) .layer( TraceLayer::new_for_http() - .make_span_with(DefaultMakeSpan::default().level(Level::INFO)), + .make_span_with(make_span) + .on_response(on_response), ) .layer(ValidateRequestHeaderLayer::custom( move |request: &mut Request| { diff --git a/rust-connector-sdk/src/lib.rs b/rust-connector-sdk/src/lib.rs index e858af90..f5d504e2 100644 --- a/rust-connector-sdk/src/lib.rs +++ b/rust-connector-sdk/src/lib.rs @@ -2,5 +2,6 @@ pub mod check_health; pub mod connector; pub mod default_main; pub mod routes; +pub mod tracing; pub use ndc_client::models; diff --git a/rust-connector-sdk/src/tracing.rs b/rust-connector-sdk/src/tracing.rs new file mode 100644 index 00000000..2d845a70 --- /dev/null +++ b/rust-connector-sdk/src/tracing.rs @@ -0,0 +1,107 @@ +use opentelemetry::{global, sdk::propagation::TraceContextPropagator}; +use opentelemetry_api::KeyValue; +use opentelemetry_otlp::{WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT}; +use opentelemetry_sdk::trace::Sampler; +use std::env; +use std::error::Error; +use tracing_subscriber::EnvFilter; + +use axum::body::{Body, BoxBody}; +use http::{Request, Response, Uri}; +use opentelemetry_http::HeaderExtractor; +use std::time::Duration; +use tracing::{Level, Span}; +use tracing_opentelemetry::OpenTelemetrySpanExt; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; + +pub fn init_tracing( + service_name: &Option, + otlp_endpoint: &Option, +) -> Result<(), Box> { + global::set_text_map_propagator(TraceContextPropagator::new()); + + let service_name = service_name + .clone() + .unwrap_or(env!("CARGO_PKG_NAME").to_string()); + + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter( + opentelemetry_otlp::new_exporter().tonic().with_endpoint( + otlp_endpoint + .clone() + .unwrap_or(OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT.into()), + ), + ) + .with_trace_config( + opentelemetry::sdk::trace::config() + .with_resource(opentelemetry::sdk::Resource::new(vec![ + KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_NAME, + service_name, + ), + KeyValue::new( + opentelemetry_semantic_conventions::resource::SERVICE_VERSION, + env!("CARGO_PKG_VERSION"), + ), + ])) + .with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))), + ) + .install_batch(opentelemetry::runtime::Tokio)?; + + tracing_subscriber::registry() + .with( + tracing_opentelemetry::layer() + .with_exception_field_propagation(true) + .with_tracer(tracer), + ) + .with(EnvFilter::builder().parse("info,otel::tracing=trace,otel=debug")?) + .with( + tracing_subscriber::fmt::layer() + .json() + .with_timer(tracing_subscriber::fmt::time::time()), + ) + .init(); + + Ok(()) +} +// Custom function for creating request-level spans +// tracing crate requires all fields to be defined at creation time, so any fields that will be set +// later should be defined as Empty +pub fn make_span(request: &Request) -> Span { + let span = tracing::span!( + Level::INFO, + "request", + method = %request.method(), + uri = %request.uri(), + version = ?request.version(), + deployment_id = extract_deployment_id(request.uri()), + status = tracing::field::Empty, + latency = tracing::field::Empty, + ); + + // Get parent trace id from headers, if available + // This uses OTel extension set_parent rather than setting field directly on the span to ensure + // it works no matter which propagator is configured + let parent_context = global::get_text_map_propagator(|propagator| { + propagator.extract(&HeaderExtractor(request.headers())) + }); + span.set_parent(parent_context); + + span +} + +// Rough implementation of extracting deployment ID from URI. Regex might be better? +fn extract_deployment_id(uri: &Uri) -> &str { + let path = uri.path(); + let mut parts = path.split('/').filter(|x| !x.is_empty()); + let _ = parts.next().unwrap_or_default(); + parts.next().unwrap_or("unknown") +} + +// Custom function for adding information to request-level span that is only available at response time. +pub fn on_response(response: &Response, latency: Duration, span: &Span) { + span.record("status", tracing::field::display(response.status())); + span.record("latency", tracing::field::display(latency.as_nanos())); +}