Skip to content

Commit

Permalink
Update opentelemetry libraries
Browse files Browse the repository at this point in the history
otel has been released with the changes we needed, we can now avoid parsing header env var manually.

Fixes restatedev#1217
  • Loading branch information
jackkleeman committed Apr 16, 2024
1 parent bad9322 commit fe40aac
Show file tree
Hide file tree
Showing 19 changed files with 177 additions and 239 deletions.
207 changes: 76 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ hyper-rustls = { version = "0.24.1", features = ["http2"] }
itertools = "0.11.0"
metrics = { version = "0.22" }
once_cell = "1.18"
opentelemetry = { version = "0.20.0" }
opentelemetry-http = { version = "0.9.0" }
opentelemetry_api = { version = "0.20.0" }
opentelemetry = { version = "0.22.0" }
opentelemetry-http = { version = "0.11.1" }
opentelemetry_sdk = { version = "0.22.1" }
paste = "1.0"
pin-project = "1.0"
prost = "0.12.1"
Expand Down Expand Up @@ -147,7 +147,7 @@ tonic-build = "0.11.0"
tower = "0.4"
tower-http = { version = "0.4", default-features = false }
tracing = "0.1"
tracing-opentelemetry = { version = "0.21.0" }
tracing-opentelemetry = { version = "0.23.0" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
tracing-test = { version = "0.2.4" }
ulid = { version = "1.1.0" }
Expand All @@ -170,4 +170,4 @@ strip = true # Automatically strip symbols from the binary.

[profile.bench]
# Should be enabled for benchmarking runs; increases binary size
debug = true
debug = true
1 change: 1 addition & 0 deletions crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ http-old = { package = "http", version = "0.2.12" }

# Tracing
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use http::{HeaderMap, Request};
use opentelemetry::propagation::{Extractor, TextMapPropagator};
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use std::task::{Context, Poll};
use tower::{Layer, Service};

Expand Down
3 changes: 1 addition & 2 deletions crates/ingress-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ base64 = { workspace = true }
bytes = { workspace = true }
derive_builder = { workspace = true }
drain = { workspace = true }
opentelemetry_api = { workspace = true }
opentelemetry = { workspace = true }
prost = { workspace = true }
rdkafka = { version = "0.34", features = ["libz-static", "cmake-build"] }
schemars = { workspace = true, optional = true }
Expand All @@ -45,4 +45,3 @@ restate-types = { workspace = true, features = ["test-util"] }

base64 = { workspace = true }
serde_json = { workspace = true }

2 changes: 1 addition & 1 deletion crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use base64::Engine;
use bytes::Bytes;
use opentelemetry_api::trace::TraceContextExt;
use opentelemetry::trace::TraceContextExt;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ hyper = { workspace = true, features = ["http1", "http2", "client", "tcp", "stre
itertools = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-http = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/invoker-impl/src/invocation_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use hyper::http::response::Parts as ResponseParts;
use hyper::http::HeaderValue;
use hyper::{http, Body, HeaderMap, Response};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use opentelemetry_http::HeaderInjector;
use restate_errors::warn_it;
use restate_invoker_api::{
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ publish = false

[features]
default = []
conversion = ["dep:restate-types", "dep:restate-storage-api", "dep:thiserror", "dep:anyhow", "dep:bytes", "dep:bytestring", "dep:opentelemetry_api"]
conversion = ["dep:restate-types", "dep:restate-storage-api", "dep:thiserror", "dep:anyhow", "dep:bytes", "dep:bytestring", "dep:opentelemetry"]

[dependencies]
restate-types = { workspace = true, optional = true }
Expand All @@ -18,7 +18,7 @@ restate-storage-api = { workspace = true, optional = true }
anyhow = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
bytestring = { workspace = true, optional = true }
opentelemetry_api = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
thiserror = { workspace = true, optional = true }
Expand Down
17 changes: 8 additions & 9 deletions crates/storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub mod storage {
use anyhow::anyhow;
use bytes::{Buf, Bytes};
use bytestring::ByteString;
use opentelemetry_api::trace::TraceState;
use opentelemetry::trace::TraceState;
use restate_storage_api::StorageError;
use restate_types::errors::{IdDecodeError, InvocationError};
use restate_types::invocation::{InvocationTermination, TerminationFlavor};
Expand Down Expand Up @@ -1151,9 +1151,8 @@ pub mod storage {
} = value;

let trace_id = try_bytes_into_trace_id(trace_id)?;
let span_id =
opentelemetry_api::trace::SpanId::from_bytes(span_id.to_be_bytes());
let trace_flags = opentelemetry_api::trace::TraceFlags::new(
let span_id = opentelemetry::trace::SpanId::from_bytes(span_id.to_be_bytes());
let trace_flags = opentelemetry::trace::TraceFlags::new(
u8::try_from(trace_flags).map_err(ConversionError::invalid_data)?,
);

Expand All @@ -1167,7 +1166,7 @@ pub mod storage {

Ok(
restate_types::invocation::ServiceInvocationSpanContext::new(
opentelemetry_api::trace::SpanContext::new(
opentelemetry::trace::SpanContext::new(
trace_id,
span_id,
trace_flags,
Expand Down Expand Up @@ -1210,7 +1209,7 @@ pub mod storage {
match value.kind.ok_or(ConversionError::missing_field("kind"))? {
span_relation::Kind::Parent(span_relation::Parent { span_id }) => {
let span_id =
opentelemetry_api::trace::SpanId::from_bytes(span_id.to_be_bytes());
opentelemetry::trace::SpanId::from_bytes(span_id.to_be_bytes());
Ok(Self::Parent(span_id))
}
span_relation::Kind::Linked(span_relation::Linked {
Expand All @@ -1219,7 +1218,7 @@ pub mod storage {
}) => {
let trace_id = try_bytes_into_trace_id(trace_id)?;
let span_id =
opentelemetry_api::trace::SpanId::from_bytes(span_id.to_be_bytes());
opentelemetry::trace::SpanId::from_bytes(span_id.to_be_bytes());
Ok(Self::Linked(trace_id, span_id))
}
}
Expand All @@ -1246,7 +1245,7 @@ pub mod storage {

fn try_bytes_into_trace_id(
mut bytes: Bytes,
) -> Result<opentelemetry_api::trace::TraceId, ConversionError> {
) -> Result<opentelemetry::trace::TraceId, ConversionError> {
if bytes.len() != 16 {
return Err(ConversionError::InvalidData(anyhow!(
"trace id pb definition needs to contain exactly 16 bytes"
Expand All @@ -1256,7 +1255,7 @@ pub mod storage {
let mut bytes_array = [0; 16];
bytes.copy_to_slice(&mut bytes_array);

Ok(opentelemetry_api::trace::TraceId::from_bytes(bytes_array))
Ok(opentelemetry::trace::TraceId::from_bytes(bytes_array))
}

impl TryFrom<ServiceInvocationResponseSink>
Expand Down
12 changes: 5 additions & 7 deletions crates/tracing-instrumentation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ console-subscriber = { version = "0.2.0", optional = true }
derive_builder = { workspace = true }
nu-ansi-term = "0.46.0"
once_cell = { workspace = true }
opentelemetry = { workspace = true, features = ["rt-tokio"] }
opentelemetry-contrib = { version = "0.12.0", features = ["jaeger_json_exporter", "rt-tokio"] }
opentelemetry-otlp = { version = "0.13.0" }
opentelemetry-semantic-conventions = "0.12.0"
opentelemetry = { workspace = true }
opentelemetry-contrib = { version = "0.14.0", features = ["jaeger_json_exporter", "rt-tokio"] }
opentelemetry-otlp = { version = "0.15.0" }
opentelemetry_sdk = { workspace = true }
opentelemetry-semantic-conventions = "0.14.0"
schemars = { workspace = true, optional = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
# pin the version that opentelemetry-otlp uses; this dependency can be removed when otlp crate is released with
# support for building tonic metadata from env vars
tonic = { version = "0.9.2" }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json"] }
Expand Down
78 changes: 25 additions & 53 deletions crates/tracing-instrumentation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ use crate::pretty::PrettyFields;
use crate::processor::ResourceModifyingSpanProcessor;
use crate::tracer::SpanModifyingTracer;
use metrics_tracing_context::MetricsLayer;
use opentelemetry::sdk::trace::BatchSpanProcessor;
use opentelemetry::trace::{TraceError, TracerProvider};
use opentelemetry::KeyValue;
use opentelemetry_contrib::trace::exporter::jaeger_json::JaegerJsonExporter;
use opentelemetry_otlp::{SpanExporterBuilder, WithExportConfig};
use opentelemetry_sdk::trace::BatchSpanProcessor;
use pretty::Pretty;
use restate_types::config::{CommonOptions, LogFormat};
use std::env;
use std::fmt::Display;
use std::str::FromStr;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tracing::{info, warn, Level};
use tracing_subscriber::filter::{Filtered, ParseError};
use tracing_subscriber::fmt::time::SystemTime;
Expand Down Expand Up @@ -69,32 +68,41 @@ where
return Ok(None);
}

let resource = opentelemetry::sdk::Resource::new(vec![
opentelemetry_semantic_conventions::resource::SERVICE_NAME.string(service_name.clone()),
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE.string("Restate"),
opentelemetry_semantic_conventions::resource::SERVICE_INSTANCE_ID
.string(instance_id.to_string()),
opentelemetry_semantic_conventions::resource::SERVICE_VERSION
.string(env!("CARGO_PKG_VERSION")),
let resource = opentelemetry_sdk::Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name.clone(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE,
"Restate",
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_INSTANCE_ID,
instance_id.to_string(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
]);

// the following logic is based on `opentelemetry_otlp::span::build_batch_with_exporter`
// but also injecting ResourceModifyingSpanProcessor around the BatchSpanProcessor

let mut tracer_provider_builder = opentelemetry::sdk::trace::TracerProvider::builder()
.with_config(opentelemetry::sdk::trace::config().with_resource(resource));
let mut tracer_provider_builder = opentelemetry_sdk::trace::TracerProvider::builder()
.with_config(opentelemetry_sdk::trace::config().with_resource(resource));

if let Some(endpoint) = &common_opts.tracing_endpoint {
let exporter = SpanExporterBuilder::from(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
.with_metadata(parse_headers_from_env()),
.with_endpoint(endpoint),
)
.build_span_exporter()?;
tracer_provider_builder =
tracer_provider_builder.with_span_processor(ResourceModifyingSpanProcessor::new(
BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio).build(),
BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio).build(),
));
}

Expand All @@ -106,9 +114,9 @@ where
path.into(),
"trace".to_string(),
service_name,
opentelemetry::runtime::Tokio,
opentelemetry_sdk::runtime::Tokio,
),
opentelemetry::runtime::Tokio,
opentelemetry_sdk::runtime::Tokio,
)
.build(),
));
Expand All @@ -134,42 +142,6 @@ where
))
}

// Until https://github.com/open-telemetry/opentelemetry-rust/pull/1377 is released, we copy its logic
// here to include trace headers in OTLP requests
fn parse_headers_from_env() -> MetadataMap {
env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS")
.or_else(|_| env::var("OTEL_EXPORTER_OTLP_HEADERS"))
.map(|input| {
let iter = parse_header_string(&input).filter_map(|(key, value)| {
Some((
MetadataKey::from_str(key).ok()?,
MetadataValue::try_from(value).ok()?,
))
});
let (_, upper) = iter.size_hint();
let mut map = MetadataMap::with_capacity(upper.unwrap_or_default());
iter.for_each(|(key, value)| {
map.insert(key, value);
});
map
})
.unwrap_or_default()
}

fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, &str)> {
value
.split_terminator(',')
.map(str::trim)
.filter_map(parse_header_key_value_string)
}

fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, &str)> {
key_value_string
.split_once('=')
.map(|(key, value)| (key.trim(), value.trim()))
.filter(|(key, value)| !key.is_empty() && !value.is_empty())
}

#[allow(clippy::type_complexity)]
fn build_logging_layer<S>(
common_opts: &CommonOptions,
Expand Down
14 changes: 8 additions & 6 deletions crates/tracing-instrumentation/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::Span;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::TraceResult;
use opentelemetry::{Context, Key, KeyValue};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::trace::Span;
use opentelemetry_sdk::Resource;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use std::borrow::Cow;

Expand All @@ -33,7 +33,7 @@ impl<T> ResourceModifyingSpanProcessor<T> {
}
}

impl<T: opentelemetry::sdk::trace::SpanProcessor> opentelemetry::sdk::trace::SpanProcessor
impl<T: opentelemetry_sdk::trace::SpanProcessor> opentelemetry_sdk::trace::SpanProcessor
for ResourceModifyingSpanProcessor<T>
{
fn on_start(&self, span: &mut Span, cx: &Context) {
Expand All @@ -43,9 +43,11 @@ impl<T: opentelemetry::sdk::trace::SpanProcessor> opentelemetry::sdk::trace::Spa
fn on_end(&self, data: SpanData) {
let mut data = data;

if let Some(service_name) = data.attributes.get(&RPC_SERVICE) {
if let Some(service_name_attribute) =
data.attributes.iter().find(|kv| kv.key == RPC_SERVICE)
{
data.resource = Cow::Owned(data.resource.merge(&Resource::new(std::iter::once(
KeyValue::new(SERVICE_NAME, service_name.clone()),
KeyValue::new(SERVICE_NAME, service_name_attribute.value.clone()),
))));
};

Expand Down
Loading

0 comments on commit fe40aac

Please sign in to comment.