Skip to content

Commit

Permalink
Merge pull request #51 from hasura/djh/copy-trace-propagation-from-mu…
Browse files Browse the repository at this point in the history
…ltitenant

chore: trace propagation implementation
  • Loading branch information
danieljharvey authored Oct 3, 2023
2 parents 9ee707f + 0ae23a7 commit 3248df5
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 66 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# re-build on code changes, and run the reference agent each time a build is
# successful
dev:
cargo watch \
-x test \
-x 'run --bin ndc_hub_example \
-- serve --configuration <(echo 'null') \
--otlp-endpoint http://localhost:4317'

# reformat everything
format:
cargo fmt --all

# is everything formatted?
format-check:
cargo fmt --all --check

# run `clippy` linter
lint *FLAGS:
cargo clippy {{FLAGS}}

lint-apply *FLAGS:
cargo clippy --fix {{FLAGS}}
4 changes: 3 additions & 1 deletion rust-connector-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ axum-macros = "^0.3.7"
clap = { version = "^4.3.9", features = ["derive", "env"] }
ndc-client = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.6" }
ndc-test = { git = "http://github.com/hasura/ndc-spec.git", tag = "v0.1.0-rc.6" }
http = "^0.2"
opentelemetry = { version = "^0.20", features = [
"rt-tokio",
"trace",
], default-features = false }
opentelemetry_api = "^0.20.0"
opentelemetry_sdk = "^0.20.0"
opentelemetry-http = "0.9.0"
opentelemetry-otlp = { version = "^0.13.0", features = ["reqwest-client"] }
opentelemetry_sdk = "^0.20.0"
opentelemetry-semantic-conventions = "^0.12.0"
prometheus = "^0.13.3"
reqwest = "^0.11.20"
Expand Down
74 changes: 9 additions & 65 deletions rust-connector-sdk/src/default_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
check_health,
connector::{Connector, InvalidRange, SchemaError, UpdateConfigurationError},
routes,
tracing::{init_tracing, make_span, on_response},
};

use async_trait::async_trait;
Expand All @@ -23,22 +24,13 @@ use ndc_client::models::{
QueryRequest, QueryResponse, SchemaResponse,
};
use ndc_test::report;
use opentelemetry::{global, sdk::propagation::TraceContextPropagator};
use opentelemetry_api::KeyValue;
use opentelemetry_otlp::{WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT};
use opentelemetry_sdk::trace::Sampler;
use prometheus::Registry;
use schemars::{schema::RootSchema, JsonSchema};
use serde::{de::DeserializeOwned, Serialize};
use std::error::Error;
use std::net;
use std::{env, process::exit};
use tower_http::{
cors::CorsLayer,
trace::{DefaultMakeSpan, TraceLayer},
};
use tracing::Level;
use tracing_subscriber::{prelude::*, EnvFilter};
use std::process::exit;
use tower_http::{cors::CorsLayer, trace::TraceLayer};

use self::v2_compat::SourceConfig;

Expand Down Expand Up @@ -162,57 +154,6 @@ where
}
}

fn init_tracing(serve_command: &ServeCommand) -> Result<(), Box<dyn Error>> {
global::set_text_map_propagator(TraceContextPropagator::new());

let service_name = serve_command
.service_name
.clone()
.unwrap_or(env!("CARGO_PKG_NAME").to_string());

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
serve_command
.otlp_endpoint
.clone()
.unwrap_or(OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT.into()),
),
)
.with_trace_config(
opentelemetry::sdk::trace::config()
.with_resource(opentelemetry::sdk::Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name,
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
]))
.with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
)
.install_batch(opentelemetry::runtime::Tokio)?;

tracing_subscriber::registry()
.with(
tracing_opentelemetry::layer()
.with_exception_field_propagation(true)
.with_tracer(tracer),
)
.with(EnvFilter::builder().parse("info,otel::tracing=trace,otel=debug")?)
.with(
tracing_subscriber::fmt::layer()
.json()
.with_timer(tracing_subscriber::fmt::time::time()),
)
.init();

Ok(())
}

async fn serve<C: Connector + Clone + Default + 'static>(
serve_command: ServeCommand,
) -> Result<(), Box<dyn Error>>
Expand All @@ -221,7 +162,8 @@ where
C::Configuration: Serialize + DeserializeOwned + Sync + Send + Clone,
C::State: Sync + Send + Clone,
{
init_tracing(&serve_command).expect("Unable to initialize tracing");
init_tracing(&serve_command.service_name, &serve_command.otlp_endpoint)
.expect("Unable to initialize tracing");

let server_state = init_server_state::<C>(serve_command.configuration).await;

Expand Down Expand Up @@ -333,7 +275,8 @@ where
router
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().level(Level::INFO)),
.make_span_with(make_span)
.on_response(on_response),
)
.layer(ValidateRequestHeaderLayer::custom(
move |request: &mut Request<Body>| {
Expand Down Expand Up @@ -376,7 +319,8 @@ where
.route("/explain", post(v2_compat::post_explain::<C>))
.layer(
TraceLayer::new_for_http()
.make_span_with(DefaultMakeSpan::default().level(Level::INFO)),
.make_span_with(make_span)
.on_response(on_response),
)
.layer(ValidateRequestHeaderLayer::custom(
move |request: &mut Request<Body>| {
Expand Down
1 change: 1 addition & 0 deletions rust-connector-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ pub mod check_health;
pub mod connector;
pub mod default_main;
pub mod routes;
pub mod tracing;

pub use ndc_client::models;
107 changes: 107 additions & 0 deletions rust-connector-sdk/src/tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
use opentelemetry::{global, sdk::propagation::TraceContextPropagator};
use opentelemetry_api::KeyValue;
use opentelemetry_otlp::{WithExportConfig, OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT};
use opentelemetry_sdk::trace::Sampler;
use std::env;
use std::error::Error;
use tracing_subscriber::EnvFilter;

use axum::body::{Body, BoxBody};
use http::{Request, Response, Uri};
use opentelemetry_http::HeaderExtractor;
use std::time::Duration;
use tracing::{Level, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

pub fn init_tracing(
service_name: &Option<String>,
otlp_endpoint: &Option<String>,
) -> Result<(), Box<dyn Error>> {
global::set_text_map_propagator(TraceContextPropagator::new());

let service_name = service_name
.clone()
.unwrap_or(env!("CARGO_PKG_NAME").to_string());

let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter().tonic().with_endpoint(
otlp_endpoint
.clone()
.unwrap_or(OTEL_EXPORTER_OTLP_ENDPOINT_DEFAULT.into()),
),
)
.with_trace_config(
opentelemetry::sdk::trace::config()
.with_resource(opentelemetry::sdk::Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name,
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
]))
.with_sampler(Sampler::ParentBased(Box::new(Sampler::AlwaysOn))),
)
.install_batch(opentelemetry::runtime::Tokio)?;

tracing_subscriber::registry()
.with(
tracing_opentelemetry::layer()
.with_exception_field_propagation(true)
.with_tracer(tracer),
)
.with(EnvFilter::builder().parse("info,otel::tracing=trace,otel=debug")?)
.with(
tracing_subscriber::fmt::layer()
.json()
.with_timer(tracing_subscriber::fmt::time::time()),
)
.init();

Ok(())
}
// Custom function for creating request-level spans
// tracing crate requires all fields to be defined at creation time, so any fields that will be set
// later should be defined as Empty
pub fn make_span(request: &Request<Body>) -> Span {
let span = tracing::span!(
Level::INFO,
"request",
method = %request.method(),
uri = %request.uri(),
version = ?request.version(),
deployment_id = extract_deployment_id(request.uri()),
status = tracing::field::Empty,
latency = tracing::field::Empty,
);

// Get parent trace id from headers, if available
// This uses OTel extension set_parent rather than setting field directly on the span to ensure
// it works no matter which propagator is configured
let parent_context = global::get_text_map_propagator(|propagator| {
propagator.extract(&HeaderExtractor(request.headers()))
});
span.set_parent(parent_context);

span
}

// Rough implementation of extracting deployment ID from URI. Regex might be better?
fn extract_deployment_id(uri: &Uri) -> &str {
let path = uri.path();
let mut parts = path.split('/').filter(|x| !x.is_empty());
let _ = parts.next().unwrap_or_default();
parts.next().unwrap_or("unknown")
}

// Custom function for adding information to request-level span that is only available at response time.
pub fn on_response(response: &Response<BoxBody>, latency: Duration, span: &Span) {
span.record("status", tracing::field::display(response.status()));
span.record("latency", tracing::field::display(latency.as_nanos()));
}

0 comments on commit 3248df5

Please sign in to comment.