diff --git a/Cargo.toml b/Cargo.toml index 7859300b..9e60d682 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,4 +19,7 @@ opentelemetry-http = "0.27" opentelemetry-proto = { version = "0.27", default-features = false } opentelemetry_sdk = { version = "0.27", default-features = false } opentelemetry-stdout = "0.27" -opentelemetry-semantic-conventions = { version = "0.27", features = ["semconv_experimental"] } +opentelemetry-semantic-conventions = { version = "0.27", features = [ + "semconv_experimental", +] } +criterion = "0.5" diff --git a/opentelemetry-etw-metrics/Cargo.toml b/opentelemetry-etw-metrics/Cargo.toml index a3eef4cb..4fae806c 100644 --- a/opentelemetry-etw-metrics/Cargo.toml +++ b/opentelemetry-etw-metrics/Cargo.toml @@ -17,10 +17,10 @@ opentelemetry-proto = { workspace = true, features = ["gen-tonic", "metrics"] } async-trait = "0.1" prost = "0.13" tracelogging = "1.2.1" -tracing = {version = "0.1", optional = true} - +tracing = { version = "0.1", optional = true } [dev-dependencies] tokio = { version = "1.0", features = ["full"] } +criterion = { workspace = true, features = ["html_reports"] } [features] internal-logs = ["tracing"] @@ -28,3 +28,7 @@ default = ["internal-logs"] [package.metadata.cargo-machete] ignored = ["tracing"] + +[[bench]] +name = "exporter" +harness = false diff --git a/opentelemetry-etw-metrics/benches/exporter.rs b/opentelemetry-etw-metrics/benches/exporter.rs new file mode 100644 index 00000000..5ffa1e56 --- /dev/null +++ b/opentelemetry-etw-metrics/benches/exporter.rs @@ -0,0 +1,105 @@ +//! run with `$ cargo bench --bench exporter -- --exact ` to run specific test for logs +//! So to run test named "fibonacci" you would run `$ cargo bench --bench exporter -- --exact fibonacci` +//! To run all tests for logs you would run `$ cargo bench --bench exporter` +//! +/* +The benchmark results: +criterion = "0.5.1" +OS: Windows 11 Enterprise N, 23H2, Build 22631.4460 +Hardware: Intel(R) Xeon(R) Platinum 8370C CPU @ 2.80GHz 2.79 GHz, 16vCPUs +RAM: 64.0 GB +| Test | Average time| +|--------------------------------|-------------| +| exporter | 22.203 ms | +*/ + +use opentelemetry::{InstrumentationScope, KeyValue}; +use opentelemetry_etw_metrics::MetricsExporter; + +use opentelemetry_sdk::{ + metrics::{ + data::{DataPoint, Metric, ResourceMetrics, ScopeMetrics, Sum}, + exporter::PushMetricExporter, + Temporality, + }, + Resource, +}; + +use criterion::{criterion_group, criterion_main, Criterion}; + +async fn export(exporter: &MetricsExporter, resource_metrics: &mut ResourceMetrics) { + exporter.export(resource_metrics).await.unwrap(); +} + +fn create_resource_metrics() -> ResourceMetrics { + // Metric does not implement clone so this helper function is used to create a metric + fn create_metric() -> Metric { + let data_point = DataPoint { + attributes: vec![KeyValue::new("datapoint key", "datapoint value")], + start_time: Some(std::time::SystemTime::now()), + time: Some(std::time::SystemTime::now()), + value: 1.0_f64, + exemplars: vec![], + }; + + let sum: Sum = Sum { + data_points: vec![data_point.clone(); 2_000], + temporality: Temporality::Delta, + is_monotonic: true, + }; + + Metric { + name: "metric_name".into(), + description: "metric description".into(), + unit: "metric unit".into(), + data: Box::new(sum), + } + } + + ResourceMetrics { + resource: Resource::new(vec![KeyValue::new("service.name", "my-service")]), + scope_metrics: vec![ScopeMetrics { + scope: InstrumentationScope::default(), + metrics: vec![ + create_metric(), + create_metric(), + create_metric(), + create_metric(), + create_metric(), + create_metric(), + create_metric(), + create_metric(), + create_metric(), + create_metric(), + ], + }], + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + + c.bench_function("export", move |b| { + b.iter_custom(|iters| { + let exporter = MetricsExporter::new(); + + let mut resource_metrics = create_resource_metrics(); + + let start = std::time::Instant::now(); + + for _i in 0..iters { + runtime.block_on(async { + export(&exporter, &mut resource_metrics).await; + }); + } + start.elapsed() + }) + }); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/opentelemetry-etw-metrics/src/exporter/mod.rs b/opentelemetry-etw-metrics/src/exporter/mod.rs index 26a6155c..ce1fc3a6 100644 --- a/opentelemetry-etw-metrics/src/exporter/mod.rs +++ b/opentelemetry-etw-metrics/src/exporter/mod.rs @@ -1,21 +1,23 @@ -use opentelemetry::otel_warn; +use crate::etw; -use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; -use opentelemetry_sdk::metrics::{ - data::{ - self, ExponentialBucket, ExponentialHistogramDataPoint, Metric, ResourceMetrics, - ScopeMetrics, +use opentelemetry::otel_warn; +use opentelemetry_proto::tonic::{ + collector::metrics::v1::ExportMetricsServiceRequest, + metrics::v1::{ + metric::Data as TonicMetricData, ExponentialHistogram as TonicExponentialHistogram, + Gauge as TonicGauge, Histogram as TonicHistogram, Metric as TonicMetric, + ResourceMetrics as TonicResourceMetrics, ScopeMetrics as TonicScopeMetrics, + Sum as TonicSum, Summary as TonicSummary, }, - exporter::PushMetricExporter, - MetricError, MetricResult, Temporality, }; -use prost::Message; - -use async_trait::async_trait; +use opentelemetry_sdk::metrics::{ + data::ResourceMetrics, exporter::PushMetricExporter, MetricError, MetricResult, Temporality, +}; use std::fmt::{Debug, Formatter}; -use crate::etw; +use async_trait::async_trait; +use prost::Message; pub struct MetricsExporter {} @@ -39,265 +41,147 @@ impl Debug for MetricsExporter { } } +fn emit_export_metric_service_request( + export_metric_service_request: &ExportMetricsServiceRequest, + encoding_buffer: &mut Vec, +) -> MetricResult<()> { + if (export_metric_service_request.encoded_len()) > etw::MAX_EVENT_SIZE { + otel_warn!(name: "MetricExportFailedDueToMaxSizeLimit", size = export_metric_service_request.encoded_len(), max_size = etw::MAX_EVENT_SIZE); + } else { + encoding_buffer.resize_with( + export_metric_service_request.encoded_len(), + Default::default, + ); + + export_metric_service_request + .encode(encoding_buffer) + .map_err(|err| MetricError::Other(err.to_string()))?; + + let result = etw::write(encoding_buffer); + // TODO: Better logging/internal metrics needed here for non-failure + // case Uncomment the line below to see the exported bytes until a + // better logging solution is implemented + // println!("Exported {} bytes to ETW", byte_array.len()); + if result != 0 { + otel_warn!(name: "MetricExportFailed", error_code = result); + } + } + + Ok(()) +} + #[async_trait] impl PushMetricExporter for MetricsExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()> { + let schema_url: String = metrics + .resource + .schema_url() + .map(Into::into) + .unwrap_or_default(); + + // TODO: Reuse this vec across exports by storing it in `MetricsExporter` + let mut encoding_buffer = Vec::new(); + for scope_metric in &metrics.scope_metrics { for metric in &scope_metric.metrics { - let mut resource_metrics = Vec::new(); - - let data = &metric.data.as_any(); - if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Histogram { - temporality: hist.temporality, - data_points: vec![data_point.clone()], - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Histogram { - temporality: hist.temporality, - data_points: vec![data_point.clone()], - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::ExponentialHistogram { - temporality: hist.temporality, - data_points: vec![ExponentialHistogramDataPoint { - attributes: data_point.attributes.clone(), - count: data_point.count, - start_time: data_point.start_time, - time: data_point.time, - min: data_point.min, - max: data_point.max, - sum: data_point.sum, - scale: data_point.scale, - zero_count: data_point.zero_count, - zero_threshold: data_point.zero_threshold, - positive_bucket: ExponentialBucket { - offset: data_point.positive_bucket.offset, - counts: data_point.positive_bucket.counts.clone(), - }, - negative_bucket: ExponentialBucket { - offset: data_point.negative_bucket.offset, - counts: data_point.negative_bucket.counts.clone(), - }, - exemplars: data_point.exemplars.clone(), - }], - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(hist) = data.downcast_ref::>() { - for data_point in &hist.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::ExponentialHistogram { - temporality: hist.temporality, - data_points: vec![ExponentialHistogramDataPoint { - attributes: data_point.attributes.clone(), - count: data_point.count, - start_time: data_point.start_time, - time: data_point.time, - min: data_point.min, - max: data_point.max, - sum: data_point.sum, - scale: data_point.scale, - zero_count: data_point.zero_count, - zero_threshold: data_point.zero_threshold, - positive_bucket: ExponentialBucket { - offset: data_point.positive_bucket.offset, - counts: data_point.positive_bucket.counts.clone(), - }, - negative_bucket: ExponentialBucket { - offset: data_point.negative_bucket.offset, - counts: data_point.negative_bucket.counts.clone(), - }, - exemplars: data_point.exemplars.clone(), - }], - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(sum) = data.downcast_ref::>() { - for data_point in &sum.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Sum { - temporality: sum.temporality, - data_points: vec![data_point.clone()], - is_monotonic: sum.is_monotonic, - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(sum) = data.downcast_ref::>() { - for data_point in &sum.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Sum { - temporality: sum.temporality, - data_points: vec![data_point.clone()], - is_monotonic: sum.is_monotonic, - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(sum) = data.downcast_ref::>() { - for data_point in &sum.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Sum { - temporality: sum.temporality, - data_points: vec![data_point.clone()], - is_monotonic: sum.is_monotonic, - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(gauge) = data.downcast_ref::>() { - for data_point in &gauge.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Gauge { - data_points: vec![data_point.clone()], - }), - }], + let proto_data: Option = metric.data.as_any().try_into().ok(); + + // This ExportMetricsServiceRequest is created for each metric and will hold a single data point. + let mut export_metrics_service_request = ExportMetricsServiceRequest { + resource_metrics: vec![TonicResourceMetrics { + resource: Some((&metrics.resource).into()), + scope_metrics: vec![TonicScopeMetrics { + scope: Some((&scope_metric.scope, None).into()), + metrics: vec![TonicMetric { + name: metric.name.to_string(), + description: metric.description.to_string(), + unit: metric.unit.to_string(), + metadata: vec![], + data: None, // Initially data is None, it will be set based on the type of metric }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(gauge) = data.downcast_ref::>() { - for data_point in &gauge.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Gauge { - data_points: vec![data_point.clone()], - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else if let Some(gauge) = data.downcast_ref::>() { - for data_point in &gauge.data_points { - let resource_metric = ResourceMetrics { - resource: metrics.resource.clone(), - scope_metrics: vec![ScopeMetrics { - scope: scope_metric.scope.clone(), - metrics: vec![Metric { - name: metric.name.clone(), - description: metric.description.clone(), - unit: metric.unit.clone(), - data: Box::new(data::Gauge { - data_points: vec![data_point.clone()], - }), - }], - }], - }; - resource_metrics.push(resource_metric); - } - } else { - otel_warn!(name: "MetricExportFailedDueToUnsupportedMetricType", metric_type = format!("{:?}", data)); - } - - for resource_metric in resource_metrics { - let mut byte_array = Vec::new(); - let proto_message: ExportMetricsServiceRequest = (&resource_metric).into(); - proto_message - .encode(&mut byte_array) - .map_err(|err| MetricError::Other(err.to_string()))?; - - if (byte_array.len()) > etw::MAX_EVENT_SIZE { - otel_warn!(name: "MetricExportFailedDueToMaxSizeLimit", size = byte_array.len(), max_size = etw::MAX_EVENT_SIZE); - } else { - let result = etw::write(&byte_array); - // TODO: Better logging/internal metrics needed here for non-failure - // case Uncomment the line below to see the exported bytes until a - // better logging solution is implemented - // println!("Exported {} bytes to ETW", byte_array.len()); - if result != 0 { - otel_warn!(name: "MetricExportFailed", error_code = result); + schema_url: schema_url.clone(), + }], + schema_url: schema_url.clone(), + }], + }; + + if let Some(proto_data) = proto_data { + match proto_data { + TonicMetricData::Histogram(hist) => { + for data_point in hist.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Histogram(TonicHistogram { + aggregation_temporality: hist.aggregation_temporality, + data_points: vec![data_point], + })); + emit_export_metric_service_request( + &export_metrics_service_request, + &mut encoding_buffer, + )?; + } + } + TonicMetricData::ExponentialHistogram(exp_hist) => { + for data_point in exp_hist.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::ExponentialHistogram( + TonicExponentialHistogram { + aggregation_temporality: exp_hist.aggregation_temporality, + data_points: vec![data_point], + }, + )); + emit_export_metric_service_request( + &export_metrics_service_request, + &mut encoding_buffer, + )?; + } + } + TonicMetricData::Gauge(gauge) => { + for data_point in gauge.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Gauge(TonicGauge { + data_points: vec![data_point], + })); + emit_export_metric_service_request( + &export_metrics_service_request, + &mut encoding_buffer, + )?; + } + } + TonicMetricData::Sum(sum) => { + for data_point in sum.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Sum(TonicSum { + data_points: vec![data_point], + aggregation_temporality: sum.aggregation_temporality, + is_monotonic: sum.is_monotonic, + })); + emit_export_metric_service_request( + &export_metrics_service_request, + &mut encoding_buffer, + )?; + } + } + TonicMetricData::Summary(summary) => { + for data in summary.data_points { + export_metrics_service_request.resource_metrics[0].scope_metrics + [0] + .metrics[0] + .data = Some(TonicMetricData::Summary(TonicSummary { + data_points: vec![data], + })); + emit_export_metric_service_request( + &export_metrics_service_request, + &mut encoding_buffer, + )?; + } } } }