Skip to content

Commit

Permalink
Update opentelemetry libraries (#1406)
Browse files Browse the repository at this point in the history
* Update opentelemetry libraries

otel has been released with the changes we needed, we can now avoid parsing header env var manually.

Fixes #1217

* Use clustername/nodename as the instance id in otel
  • Loading branch information
jackkleeman authored Apr 16, 2024
1 parent bad9322 commit 6bb5b87
Show file tree
Hide file tree
Showing 20 changed files with 180 additions and 248 deletions.
207 changes: 76 additions & 131 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ hyper-rustls = { version = "0.24.1", features = ["http2"] }
itertools = "0.11.0"
metrics = { version = "0.22" }
once_cell = "1.18"
opentelemetry = { version = "0.20.0" }
opentelemetry-http = { version = "0.9.0" }
opentelemetry_api = { version = "0.20.0" }
opentelemetry = { version = "0.22.0" }
opentelemetry-http = { version = "0.11.1" }
opentelemetry_sdk = { version = "0.22.1" }
paste = "1.0"
pin-project = "1.0"
prost = "0.12.1"
Expand Down Expand Up @@ -147,7 +147,7 @@ tonic-build = "0.11.0"
tower = "0.4"
tower-http = { version = "0.4", default-features = false }
tracing = "0.1"
tracing-opentelemetry = { version = "0.21.0" }
tracing-opentelemetry = { version = "0.23.0" }
tracing-subscriber = { version = "0.3", features = ["env-filter", "fmt"] }
tracing-test = { version = "0.2.4" }
ulid = { version = "1.1.0" }
Expand All @@ -170,4 +170,4 @@ strip = true # Automatically strip symbols from the binary.

[profile.bench]
# Should be enabled for benchmarking runs; increases binary size
debug = true
debug = true
1 change: 1 addition & 0 deletions crates/ingress-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ http-old = { package = "http", version = "0.2.12" }

# Tracing
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use http::{HeaderMap, Request};
use opentelemetry::propagation::{Extractor, TextMapPropagator};
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use std::task::{Context, Poll};
use tower::{Layer, Service};

Expand Down
3 changes: 1 addition & 2 deletions crates/ingress-kafka/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ base64 = { workspace = true }
bytes = { workspace = true }
derive_builder = { workspace = true }
drain = { workspace = true }
opentelemetry_api = { workspace = true }
opentelemetry = { workspace = true }
prost = { workspace = true }
rdkafka = { version = "0.34", features = ["libz-static", "cmake-build"] }
schemars = { workspace = true, optional = true }
Expand All @@ -45,4 +45,3 @@ restate-types = { workspace = true, features = ["test-util"] }

base64 = { workspace = true }
serde_json = { workspace = true }

2 changes: 1 addition & 1 deletion crates/ingress-kafka/src/consumer_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use base64::Engine;
use bytes::Bytes;
use opentelemetry_api::trace::TraceContextExt;
use opentelemetry::trace::TraceContextExt;
use rdkafka::consumer::{Consumer, DefaultConsumerContext, StreamConsumer};
use rdkafka::error::KafkaError;
use rdkafka::message::BorrowedMessage;
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-impl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ hyper = { workspace = true, features = ["http1", "http2", "client", "tcp", "stre
itertools = { workspace = true }
metrics = { workspace = true }
opentelemetry = { workspace = true }
opentelemetry_sdk = { workspace = true }
opentelemetry-http = { workspace = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/invoker-impl/src/invocation_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use hyper::http::response::Parts as ResponseParts;
use hyper::http::HeaderValue;
use hyper::{http, Body, HeaderMap, Response};
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use opentelemetry_http::HeaderInjector;
use opentelemetry_sdk::propagation::TraceContextPropagator;
use restate_errors::warn_it;
use restate_invoker_api::{
EagerState, EntryEnricher, InvokeInputJournal, JournalReader, StateReader,
Expand Down
4 changes: 2 additions & 2 deletions crates/storage-proto/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ publish = false

[features]
default = []
conversion = ["dep:restate-types", "dep:restate-storage-api", "dep:thiserror", "dep:anyhow", "dep:bytes", "dep:bytestring", "dep:opentelemetry_api"]
conversion = ["dep:restate-types", "dep:restate-storage-api", "dep:thiserror", "dep:anyhow", "dep:bytes", "dep:bytestring", "dep:opentelemetry"]

[dependencies]
restate-types = { workspace = true, optional = true }
Expand All @@ -18,7 +18,7 @@ restate-storage-api = { workspace = true, optional = true }
anyhow = { workspace = true, optional = true }
bytes = { workspace = true, optional = true }
bytestring = { workspace = true, optional = true }
opentelemetry_api = { workspace = true, optional = true }
opentelemetry = { workspace = true, optional = true }
prost = { workspace = true }
prost-types = { workspace = true }
thiserror = { workspace = true, optional = true }
Expand Down
17 changes: 8 additions & 9 deletions crates/storage-proto/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub mod storage {
use anyhow::anyhow;
use bytes::{Buf, Bytes};
use bytestring::ByteString;
use opentelemetry_api::trace::TraceState;
use opentelemetry::trace::TraceState;
use restate_storage_api::StorageError;
use restate_types::errors::{IdDecodeError, InvocationError};
use restate_types::invocation::{InvocationTermination, TerminationFlavor};
Expand Down Expand Up @@ -1151,9 +1151,8 @@ pub mod storage {
} = value;

let trace_id = try_bytes_into_trace_id(trace_id)?;
let span_id =
opentelemetry_api::trace::SpanId::from_bytes(span_id.to_be_bytes());
let trace_flags = opentelemetry_api::trace::TraceFlags::new(
let span_id = opentelemetry::trace::SpanId::from_bytes(span_id.to_be_bytes());
let trace_flags = opentelemetry::trace::TraceFlags::new(
u8::try_from(trace_flags).map_err(ConversionError::invalid_data)?,
);

Expand All @@ -1167,7 +1166,7 @@ pub mod storage {

Ok(
restate_types::invocation::ServiceInvocationSpanContext::new(
opentelemetry_api::trace::SpanContext::new(
opentelemetry::trace::SpanContext::new(
trace_id,
span_id,
trace_flags,
Expand Down Expand Up @@ -1210,7 +1209,7 @@ pub mod storage {
match value.kind.ok_or(ConversionError::missing_field("kind"))? {
span_relation::Kind::Parent(span_relation::Parent { span_id }) => {
let span_id =
opentelemetry_api::trace::SpanId::from_bytes(span_id.to_be_bytes());
opentelemetry::trace::SpanId::from_bytes(span_id.to_be_bytes());
Ok(Self::Parent(span_id))
}
span_relation::Kind::Linked(span_relation::Linked {
Expand All @@ -1219,7 +1218,7 @@ pub mod storage {
}) => {
let trace_id = try_bytes_into_trace_id(trace_id)?;
let span_id =
opentelemetry_api::trace::SpanId::from_bytes(span_id.to_be_bytes());
opentelemetry::trace::SpanId::from_bytes(span_id.to_be_bytes());
Ok(Self::Linked(trace_id, span_id))
}
}
Expand All @@ -1246,7 +1245,7 @@ pub mod storage {

fn try_bytes_into_trace_id(
mut bytes: Bytes,
) -> Result<opentelemetry_api::trace::TraceId, ConversionError> {
) -> Result<opentelemetry::trace::TraceId, ConversionError> {
if bytes.len() != 16 {
return Err(ConversionError::InvalidData(anyhow!(
"trace id pb definition needs to contain exactly 16 bytes"
Expand All @@ -1256,7 +1255,7 @@ pub mod storage {
let mut bytes_array = [0; 16];
bytes.copy_to_slice(&mut bytes_array);

Ok(opentelemetry_api::trace::TraceId::from_bytes(bytes_array))
Ok(opentelemetry::trace::TraceId::from_bytes(bytes_array))
}

impl TryFrom<ServiceInvocationResponseSink>
Expand Down
12 changes: 5 additions & 7 deletions crates/tracing-instrumentation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,15 @@ console-subscriber = { version = "0.2.0", optional = true }
derive_builder = { workspace = true }
nu-ansi-term = "0.46.0"
once_cell = { workspace = true }
opentelemetry = { workspace = true, features = ["rt-tokio"] }
opentelemetry-contrib = { version = "0.12.0", features = ["jaeger_json_exporter", "rt-tokio"] }
opentelemetry-otlp = { version = "0.13.0" }
opentelemetry-semantic-conventions = "0.12.0"
opentelemetry = { workspace = true }
opentelemetry-contrib = { version = "0.14.0", features = ["jaeger_json_exporter", "rt-tokio"] }
opentelemetry-otlp = { version = "0.15.0" }
opentelemetry_sdk = { workspace = true }
opentelemetry-semantic-conventions = "0.14.0"
schemars = { workspace = true, optional = true }
serde = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, optional = true }
# pin the version that opentelemetry-otlp uses; this dependency can be removed when otlp crate is released with
# support for building tonic metadata from env vars
tonic = { version = "0.9.2" }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true, features = ["json"] }
Expand Down
81 changes: 25 additions & 56 deletions crates/tracing-instrumentation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@ use crate::pretty::PrettyFields;
use crate::processor::ResourceModifyingSpanProcessor;
use crate::tracer::SpanModifyingTracer;
use metrics_tracing_context::MetricsLayer;
use opentelemetry::sdk::trace::BatchSpanProcessor;
use opentelemetry::trace::{TraceError, TracerProvider};
use opentelemetry::KeyValue;
use opentelemetry_contrib::trace::exporter::jaeger_json::JaegerJsonExporter;
use opentelemetry_otlp::{SpanExporterBuilder, WithExportConfig};
use opentelemetry_sdk::trace::BatchSpanProcessor;
use pretty::Pretty;
use restate_types::config::{CommonOptions, LogFormat};
use std::env;
use std::fmt::Display;
use std::str::FromStr;
use tonic::metadata::{MetadataKey, MetadataMap, MetadataValue};
use tracing::{info, warn, Level};
use tracing_subscriber::filter::{Filtered, ParseError};
use tracing_subscriber::fmt::time::SystemTime;
Expand Down Expand Up @@ -54,7 +53,6 @@ pub enum Error {
fn build_tracing_layer<S>(
common_opts: &CommonOptions,
service_name: String,
instance_id: impl Display,
) -> Result<
Option<
Filtered<tracing_opentelemetry::OpenTelemetryLayer<S, SpanModifyingTracer>, EnvFilter, S>,
Expand All @@ -69,32 +67,41 @@ where
return Ok(None);
}

let resource = opentelemetry::sdk::Resource::new(vec![
opentelemetry_semantic_conventions::resource::SERVICE_NAME.string(service_name.clone()),
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE.string("Restate"),
opentelemetry_semantic_conventions::resource::SERVICE_INSTANCE_ID
.string(instance_id.to_string()),
opentelemetry_semantic_conventions::resource::SERVICE_VERSION
.string(env!("CARGO_PKG_VERSION")),
let resource = opentelemetry_sdk::Resource::new(vec![
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
service_name.clone(),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAMESPACE,
"Restate",
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_INSTANCE_ID,
format!("{}/{}", common_opts.cluster_name(), common_opts.node_name()),
),
KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
]);

// the following logic is based on `opentelemetry_otlp::span::build_batch_with_exporter`
// but also injecting ResourceModifyingSpanProcessor around the BatchSpanProcessor

let mut tracer_provider_builder = opentelemetry::sdk::trace::TracerProvider::builder()
.with_config(opentelemetry::sdk::trace::config().with_resource(resource));
let mut tracer_provider_builder = opentelemetry_sdk::trace::TracerProvider::builder()
.with_config(opentelemetry_sdk::trace::config().with_resource(resource));

if let Some(endpoint) = &common_opts.tracing_endpoint {
let exporter = SpanExporterBuilder::from(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
.with_metadata(parse_headers_from_env()),
.with_endpoint(endpoint),
)
.build_span_exporter()?;
tracer_provider_builder =
tracer_provider_builder.with_span_processor(ResourceModifyingSpanProcessor::new(
BatchSpanProcessor::builder(exporter, opentelemetry::runtime::Tokio).build(),
BatchSpanProcessor::builder(exporter, opentelemetry_sdk::runtime::Tokio).build(),
));
}

Expand All @@ -106,9 +113,9 @@ where
path.into(),
"trace".to_string(),
service_name,
opentelemetry::runtime::Tokio,
opentelemetry_sdk::runtime::Tokio,
),
opentelemetry::runtime::Tokio,
opentelemetry_sdk::runtime::Tokio,
)
.build(),
));
Expand All @@ -134,42 +141,6 @@ where
))
}

// Until https://github.com/open-telemetry/opentelemetry-rust/pull/1377 is released, we copy its logic
// here to include trace headers in OTLP requests
fn parse_headers_from_env() -> MetadataMap {
env::var("OTEL_EXPORTER_OTLP_TRACES_HEADERS")
.or_else(|_| env::var("OTEL_EXPORTER_OTLP_HEADERS"))
.map(|input| {
let iter = parse_header_string(&input).filter_map(|(key, value)| {
Some((
MetadataKey::from_str(key).ok()?,
MetadataValue::try_from(value).ok()?,
))
});
let (_, upper) = iter.size_hint();
let mut map = MetadataMap::with_capacity(upper.unwrap_or_default());
iter.for_each(|(key, value)| {
map.insert(key, value);
});
map
})
.unwrap_or_default()
}

fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, &str)> {
value
.split_terminator(',')
.map(str::trim)
.filter_map(parse_header_key_value_string)
}

fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, &str)> {
key_value_string
.split_once('=')
.map(|(key, value)| (key.trim(), value.trim()))
.filter(|(key, value)| !key.is_empty() && !value.is_empty())
}

#[allow(clippy::type_complexity)]
fn build_logging_layer<S>(
common_opts: &CommonOptions,
Expand Down Expand Up @@ -210,7 +181,6 @@ where
pub fn init_tracing_and_logging(
common_opts: &CommonOptions,
service_name: impl Display,
instance_id: impl Display,
) -> Result<TracingGuard, Error> {
let restate_service_name = format!("Restate service: {service_name}");

Expand All @@ -233,7 +203,6 @@ pub fn init_tracing_and_logging(
let layers = layers.with(build_tracing_layer(
common_opts,
restate_service_name.clone(),
instance_id,
)?);

layers.init();
Expand Down
14 changes: 8 additions & 6 deletions crates/tracing-instrumentation/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use opentelemetry::sdk::export::trace::SpanData;
use opentelemetry::sdk::trace::Span;
use opentelemetry::sdk::Resource;
use opentelemetry::trace::TraceResult;
use opentelemetry::{Context, Key, KeyValue};
use opentelemetry_sdk::export::trace::SpanData;
use opentelemetry_sdk::trace::Span;
use opentelemetry_sdk::Resource;
use opentelemetry_semantic_conventions::resource::SERVICE_NAME;
use std::borrow::Cow;

Expand All @@ -33,7 +33,7 @@ impl<T> ResourceModifyingSpanProcessor<T> {
}
}

impl<T: opentelemetry::sdk::trace::SpanProcessor> opentelemetry::sdk::trace::SpanProcessor
impl<T: opentelemetry_sdk::trace::SpanProcessor> opentelemetry_sdk::trace::SpanProcessor
for ResourceModifyingSpanProcessor<T>
{
fn on_start(&self, span: &mut Span, cx: &Context) {
Expand All @@ -43,9 +43,11 @@ impl<T: opentelemetry::sdk::trace::SpanProcessor> opentelemetry::sdk::trace::Spa
fn on_end(&self, data: SpanData) {
let mut data = data;

if let Some(service_name) = data.attributes.get(&RPC_SERVICE) {
if let Some(service_name_attribute) =
data.attributes.iter().find(|kv| kv.key == RPC_SERVICE)
{
data.resource = Cow::Owned(data.resource.merge(&Resource::new(std::iter::once(
KeyValue::new(SERVICE_NAME, service_name.clone()),
KeyValue::new(SERVICE_NAME, service_name_attribute.value.clone()),
))));
};

Expand Down
Loading

0 comments on commit 6bb5b87

Please sign in to comment.