Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP tonic metadata from env variable #1377

Merged
merged 8 commits into from
Nov 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-otlp/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- Add `grpcio` metrics exporter (#1202)
- Allow specifying OTLP HTTP headers from env variable (#1290)
- Support custom channels in topic exporters [#1335](https://github.com/open-telemetry/opentelemetry-rust/pull/1335)
- Allow specifying OTLP Tonic metadata from env variable (#1377)

### Changed

Expand Down
44 changes: 8 additions & 36 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use super::default_headers;
use super::{default_headers, parse_header_string};

#[cfg(feature = "metrics")]
mod metrics;
Expand Down Expand Up @@ -316,46 +316,18 @@ fn resolve_endpoint(

#[allow(clippy::mutable_key_type)] // http headers are not mutated
fn add_header_from_string(input: &str, headers: &mut HashMap<HeaderName, HeaderValue>) {
for pair in input.split_terminator(',') {
if pair.trim().is_empty() {
continue;
}
if let Some((k, v)) = pair.trim().split_once('=') {
if !k.trim().is_empty() && !v.trim().is_empty() {
if let (Ok(key), Ok(value)) = (
HeaderName::from_str(k.trim()),
HeaderValue::from_str(v.trim()),
) {
headers.insert(key, value);
}
}
}
}
headers.extend(parse_header_string(input).filter_map(|(key, value)| {
Some((
HeaderName::from_str(key).ok()?,
HeaderValue::from_str(value).ok()?,
))
}));
}

#[cfg(test)]
mod tests {
use crate::exporter::tests::run_env_test;
use crate::{OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_EXPORTER_OTLP_TRACES_ENDPOINT};
use std::sync::Mutex;

// Make sure env tests are not running concurrently
static ENV_LOCK: Mutex<()> = Mutex::new(());

fn run_env_test<T, F>(env_vars: T, f: F)
where
F: FnOnce(),
T: Into<Vec<(&'static str, &'static str)>>,
{
let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned");
let env_vars = env_vars.into();
for (k, v) in env_vars.iter() {
std::env::set_var(k, v);
}
f();
for (k, _) in env_vars {
std::env::remove_var(k);
}
}

#[test]
fn test_append_signal_path_to_generic_env() {
Expand Down
75 changes: 75 additions & 0 deletions opentelemetry-otlp/src/exporter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,78 @@ impl<B: HasExportConfig> WithExportConfig for B {
self
}
}

#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
fn parse_header_string(value: &str) -> impl Iterator<Item = (&str, &str)> {
value
.split_terminator(',')
.map(str::trim)
.filter_map(parse_header_key_value_string)
}

#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
fn parse_header_key_value_string(key_value_string: &str) -> Option<(&str, &str)> {
key_value_string
.split_once('=')
.map(|(key, value)| (key.trim(), value.trim()))
.filter(|(key, value)| !key.is_empty() && !value.is_empty())
}

#[cfg(test)]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
mod tests {
// Make sure env tests are not running concurrently
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());

pub(crate) fn run_env_test<T, F>(env_vars: T, f: F)
where
F: FnOnce(),
T: Into<Vec<(&'static str, &'static str)>>,
{
let _env_lock = ENV_LOCK.lock().expect("env test lock poisoned");
let env_vars = env_vars.into();
for (k, v) in env_vars.iter() {
std::env::set_var(k, v);
}
f();
for (k, _) in env_vars {
std::env::remove_var(k);
}
}

#[test]
fn test_parse_header_string() {
let test_cases = vec![
// Format: (input_str, expected_headers)
("k1=v1", vec![("k1", "v1")]),
("k1=v1,k2=v2", vec![("k1", "v1"), ("k2", "v2")]),
("k1=v1=10,k2,k3", vec![("k1", "v1=10")]),
("k1=v1,,,k2,k3=10", vec![("k1", "v1"), ("k3", "10")]),
];

for (input_str, expected_headers) in test_cases {
assert_eq!(
super::parse_header_string(input_str).collect::<Vec<_>>(),
expected_headers,
)
}
}

#[test]
fn test_parse_header_key_value_string() {
let test_cases = vec![
// Format: (input_str, expected_header)
("k1=v1", Some(("k1", "v1"))),
("", None),
("=v1", None),
("k1=", None),
];

for (input_str, expected_headers) in test_cases {
assert_eq!(
super::parse_header_key_value_string(input_str),
expected_headers,
)
}
}
}
108 changes: 105 additions & 3 deletions opentelemetry-otlp/src/exporter/tonic/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use std::env;
use std::fmt::{Debug, Formatter};
use std::str::FromStr;
use std::time::Duration;

use http::{HeaderMap, HeaderName, HeaderValue};
use tonic::codec::CompressionEncoding;
use tonic::metadata::{KeyAndValueRef, MetadataMap};
use tonic::service::Interceptor;
use tonic::transport::Channel;
#[cfg(feature = "tls")]
use tonic::transport::ClientTlsConfig;

use super::default_headers;
use super::{default_headers, parse_header_string};
use crate::exporter::Compression;
use crate::{
ExportConfig, OTEL_EXPORTER_OTLP_COMPRESSION, OTEL_EXPORTER_OTLP_ENDPOINT,
OTEL_EXPORTER_OTLP_TIMEOUT,
OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TIMEOUT,
};

#[cfg(feature = "logs")]
Expand Down Expand Up @@ -213,11 +215,17 @@
signal_endpoint_path: &str,
signal_timeout_var: &str,
signal_compression_var: &str,
signal_headers_var: &str,
) -> Result<(Channel, BoxInterceptor, Option<CompressionEncoding>), crate::Error> {
let tonic_config = self.tonic_config;
let compression = resolve_compression(&tonic_config, signal_compression_var)?;

let metadata = tonic_config.metadata.unwrap_or_default();
let headers_from_env = parse_headers_from_env(signal_headers_var);
let metadata = merge_metadata_with_headers_from_env(
tonic_config.metadata.unwrap_or_default(),
headers_from_env,
);

let add_metadata = move |mut req: tonic::Request<()>| {
for key_and_value in metadata.iter() {
match key_and_value {
Expand Down Expand Up @@ -294,6 +302,7 @@
"/v1/logs",
crate::logs::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT,
crate::logs::OTEL_EXPORTER_OTLP_LOGS_COMPRESSION,
crate::logs::OTEL_EXPORTER_OTLP_LOGS_HEADERS,

Check warning on line 305 in opentelemetry-otlp/src/exporter/tonic/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/mod.rs#L305

Added line #L305 was not covered by tests
)?;

let client = TonicLogsClient::new(channel, interceptor, compression);
Expand All @@ -316,6 +325,7 @@
"/v1/metrics",
crate::metric::OTEL_EXPORTER_OTLP_METRICS_TIMEOUT,
crate::metric::OTEL_EXPORTER_OTLP_METRICS_COMPRESSION,
crate::metric::OTEL_EXPORTER_OTLP_METRICS_HEADERS,

Check warning on line 328 in opentelemetry-otlp/src/exporter/tonic/mod.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/mod.rs#L328

Added line #L328 was not covered by tests
)?;

let client = TonicMetricsClient::new(channel, interceptor, compression);
Expand All @@ -339,6 +349,7 @@
"/v1/traces",
crate::span::OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
crate::span::OTEL_EXPORTER_OTLP_TRACES_COMPRESSION,
crate::span::OTEL_EXPORTER_OTLP_TRACES_HEADERS,
)?;

let client = TonicTracesClient::new(channel, interceptor, compression);
Expand All @@ -347,11 +358,44 @@
}
}

fn merge_metadata_with_headers_from_env(
metadata: MetadataMap,
headers_from_env: HeaderMap,
) -> MetadataMap {
if headers_from_env.is_empty() {
metadata
} else {
let mut existing_headers: HeaderMap = metadata.into_headers();
existing_headers.extend(headers_from_env);

MetadataMap::from_headers(existing_headers)
}
}

fn parse_headers_from_env(signal_headers_var: &str) -> HeaderMap {
env::var(signal_headers_var)
.or_else(|_| env::var(OTEL_EXPORTER_OTLP_HEADERS))
.map(|input| {
parse_header_string(&input)
.filter_map(|(key, value)| {
Some((
HeaderName::from_str(key).ok()?,
HeaderValue::from_str(value).ok()?,
))
})
.collect::<HeaderMap>()
})
.unwrap_or_default()
}

#[cfg(test)]
mod tests {
use crate::exporter::tests::run_env_test;
#[cfg(feature = "gzip-tonic")]
use crate::exporter::Compression;
use crate::TonicExporterBuilder;
use crate::{OTEL_EXPORTER_OTLP_HEADERS, OTEL_EXPORTER_OTLP_TRACES_HEADERS};
use http::{HeaderMap, HeaderName, HeaderValue};
use tonic::metadata::{MetadataMap, MetadataValue};

#[test]
Expand Down Expand Up @@ -393,4 +437,62 @@
let builder = TonicExporterBuilder::default().with_compression(Compression::Gzip);
assert_eq!(builder.tonic_config.compression.unwrap(), Compression::Gzip);
}

#[test]
fn test_parse_headers_from_env() {
run_env_test(
vec![
(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2"),
(OTEL_EXPORTER_OTLP_HEADERS, "k3=v3"),
],
|| {
assert_eq!(
super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS),
HeaderMap::from_iter([
(
HeaderName::from_static("k1"),
HeaderValue::from_static("v1")
),
(
HeaderName::from_static("k2"),
HeaderValue::from_static("v2")
),
])
);

assert_eq!(
super::parse_headers_from_env("EMPTY_ENV"),
HeaderMap::from_iter([(
HeaderName::from_static("k3"),
HeaderValue::from_static("v3")
)])
);
},
)
}

#[test]
fn test_merge_metadata_with_headers_from_env() {
run_env_test(
vec![(OTEL_EXPORTER_OTLP_TRACES_HEADERS, "k1=v1,k2=v2")],
|| {
let headers_from_env =
super::parse_headers_from_env(OTEL_EXPORTER_OTLP_TRACES_HEADERS);

let mut metadata = MetadataMap::new();
metadata.insert("foo", "bar".parse().unwrap());
metadata.insert("k1", "v0".parse().unwrap());

let result =
super::merge_metadata_with_headers_from_env(metadata, headers_from_env);

assert_eq!(
result.get("foo").unwrap(),
MetadataValue::from_static("bar")
);
assert_eq!(result.get("k1").unwrap(), MetadataValue::from_static("v1"));
assert_eq!(result.get("k2").unwrap(), MetadataValue::from_static("v2"));
},
);
}
}
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,12 +349,12 @@ pub enum Error {
RequestFailed(#[from] opentelemetry_http::HttpError),

/// The provided value is invalid in HTTP headers.
#[cfg(feature = "http-proto")]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
#[error("http header value error {0}")]
InvalidHeaderValue(#[from] http::header::InvalidHeaderValue),

/// The provided name is invalid in HTTP headers.
#[cfg(feature = "http-proto")]
#[cfg(any(feature = "grpc-tonic", feature = "http-proto"))]
#[error("http header name error {0}")]
InvalidHeaderName(#[from] http::header::InvalidHeaderName),

Expand Down
Loading