diff --git a/opentelemetry-user-events-metrics/CHANGELOG.md b/opentelemetry-user-events-metrics/CHANGELOG.md index be309357..056086ab 100644 --- a/opentelemetry-user-events-metrics/CHANGELOG.md +++ b/opentelemetry-user-events-metrics/CHANGELOG.md @@ -2,6 +2,9 @@ ## Unreleased +- Fixed a bug which caused Histogram, Gauge metrics to be dropped. + [#30](https://github.com/open-telemetry/opentelemetry-rust-contrib/pull/30). + ## v0.2.1 - Update eventheader version to 0.3.4. diff --git a/opentelemetry-user-events-metrics/examples/basic.rs b/opentelemetry-user-events-metrics/examples/basic.rs index 9266b345..65501c4f 100644 --- a/opentelemetry-user-events-metrics/examples/basic.rs +++ b/opentelemetry-user-events-metrics/examples/basic.rs @@ -8,6 +8,8 @@ use opentelemetry_sdk::{ runtime, Resource, }; use opentelemetry_user_events_metrics::MetricsExporter; +use std::thread; +use std::time::Duration; fn init_metrics(exporter: MetricsExporter) -> SdkMeterProvider { let reader = PeriodicReader::builder(exporter, runtime::Tokio).build(); @@ -32,22 +34,109 @@ async fn main() -> Result<(), Box> { Some("test_url"), Some(vec![KeyValue::new("key", "value")]), ); - let c = meter + // Create a Counter Instrument. + let counter = meter .f64_counter("counter_test") .with_description("test_decription") .with_unit(Unit::new("test_unit")) .init(); + // Create a UpCounter Instrument. + let updown_counter = meter.i64_up_down_counter("up_down_counter_test").init(); - c.add( - 1.0, - [ - KeyValue::new("mykey1", "myvalue1"), - KeyValue::new("mykey2", "myvalue2"), - ] - .as_ref(), - ); + // Create a Histogram Instrument. + let histogram = meter + .f64_histogram("histogram_test") + .with_description("test_description") + .init(); + + // Create a ObservableGauge instrument and register a callback that reports the measurement. + let gauge = meter + .f64_observable_gauge("gauge_test") + .with_unit(Unit::new("test_unit")) + .with_description("test_descriptionn") + .init(); + + meter.register_callback(&[gauge.as_any()], move |observer| { + observer.observe_f64( + &gauge, + 1.0, + [ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ] + .as_ref(), + ) + })?; + + // Create a ObservableCounter instrument and register a callback that reports the measurement. + let observable_counter = meter + .u64_observable_counter("obs_counter_test") + .with_description("test_description") + .with_unit(Unit::new("tesT_unit")) + .init(); + + meter.register_callback(&[observable_counter.as_any()], move |observer| { + observer.observe_u64( + &observable_counter, + 100, + [ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ] + .as_ref(), + ) + })?; + + // Create a Observable UpDownCounter instrument and register a callback that reports the measurement. + let observable_up_down_counter = meter + .i64_observable_up_down_counter("obs_up_down_counter_test") + .with_description("test_description") + .with_unit(Unit::new("test_unit")) + .init(); + + meter.register_callback(&[observable_up_down_counter.as_any()], move |observer| { + observer.observe_i64( + &observable_up_down_counter, + 100, + [ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ] + .as_ref(), + ) + })?; + + loop { + // Record measurements using the Counter instrument. + counter.add( + 1.0, + [ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ] + .as_ref(), + ); - meter_provider.shutdown()?; + // Record measurements using the UpCounter instrument. + updown_counter.add( + -10, + [ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ] + .as_ref(), + ); - Ok(()) + // Record measurements using the histogram instrument. + histogram.record( + 10.5, + [ + KeyValue::new("mykey1", "myvalue1"), + KeyValue::new("mykey2", "myvalue2"), + ] + .as_ref(), + ); + // Sleep for 1 second + thread::sleep(Duration::from_secs(1)); + } } diff --git a/opentelemetry-user-events-metrics/src/exporter/mod.rs b/opentelemetry-user-events-metrics/src/exporter/mod.rs index c6e3caca..dc4e1b59 100644 --- a/opentelemetry-user-events-metrics/src/exporter/mod.rs +++ b/opentelemetry-user-events-metrics/src/exporter/mod.rs @@ -1,6 +1,6 @@ -use crate::transform::transform_resource_metrics; use async_trait::async_trait; use opentelemetry::metrics::{MetricsError, Result}; +use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest; use opentelemetry_sdk::metrics::{ data::{ResourceMetrics, Temporality}, exporter::PushMetricsExporter, @@ -69,7 +69,7 @@ impl Debug for MetricsExporter { impl PushMetricsExporter for MetricsExporter { async fn export(&self, metrics: &mut ResourceMetrics) -> Result<()> { if self.trace_point.enabled() { - let proto_message = transform_resource_metrics(metrics); + let proto_message: ExportMetricsServiceRequest = (&*metrics).into(); let mut byte_array = Vec::new(); let _encode_result = proto_message diff --git a/opentelemetry-user-events-metrics/src/lib.rs b/opentelemetry-user-events-metrics/src/lib.rs index 5517451e..5ebfb859 100644 --- a/opentelemetry-user-events-metrics/src/lib.rs +++ b/opentelemetry-user-events-metrics/src/lib.rs @@ -1,5 +1,4 @@ mod exporter; mod tracepoint; -mod transform; pub use exporter::MetricsExporter; diff --git a/opentelemetry-user-events-metrics/src/transform/mod.rs b/opentelemetry-user-events-metrics/src/transform/mod.rs deleted file mode 100644 index ec736485..00000000 --- a/opentelemetry-user-events-metrics/src/transform/mod.rs +++ /dev/null @@ -1,117 +0,0 @@ -use opentelemetry::{global, metrics::MetricsError}; -use opentelemetry_proto::tonic::common::v1::InstrumentationScope as TonicInstrumentationScope; -use opentelemetry_proto::tonic::resource::v1::Resource as TonicResource; -use opentelemetry_proto::tonic::{ - collector::metrics::v1::ExportMetricsServiceRequest, - metrics::v1::{ - exemplar::Value as TonicExemplarValue, metric::Data as TonicMetricData, - number_data_point::Value as TonicDataPointValue, - AggregationTemporality as TonicTemporality, DataPointFlags as TonicDataPointFlags, - Metric as TonicMetric, NumberDataPoint as TonicNumberDataPoint, - ResourceMetrics as TonicResourceMetrics, ScopeMetrics as TonicScopeMetrics, - Sum as TonicSum, - }, -}; -use opentelemetry_sdk::metrics::data::{ - Metric as SdkMetric, ResourceMetrics as SDKResourceMetrics, ScopeMetrics as SdkScopeMetrics, - Sum as SdkSum, -}; -use opentelemetry_sdk::Resource as SdkResource; -use std::any::Any; -use std::fmt; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; - -pub(crate) fn transform_resource_metrics( - metrics: &SDKResourceMetrics, -) -> ExportMetricsServiceRequest { - ExportMetricsServiceRequest { - resource_metrics: vec![TonicResourceMetrics { - resource: transform_resource(&metrics.resource), - scope_metrics: transform_scope_metrics(&metrics.scope_metrics), - schema_url: metrics - .resource - .schema_url() - .map(Into::into) - .unwrap_or_default(), - }], - } -} - -fn transform_resource(r: &SdkResource) -> Option { - if r.is_empty() { - return None; - } - - Some(TonicResource { - attributes: r.iter().map(Into::into).collect(), - dropped_attributes_count: 0, - }) -} - -fn transform_scope_metrics(sms: &[SdkScopeMetrics]) -> Vec { - sms.iter() - .map(|sm| TonicScopeMetrics { - scope: Some(TonicInstrumentationScope::from(&sm.scope)), - metrics: transform_metrics(&sm.metrics), - schema_url: sm - .scope - .schema_url - .as_ref() - .map(ToString::to_string) - .unwrap_or_default(), - }) - .collect() -} - -fn transform_metrics(metrics: &[SdkMetric]) -> Vec { - metrics - .iter() - .map(|metric| TonicMetric { - name: metric.name.to_string(), - description: metric.description.to_string(), - unit: metric.unit.as_str().to_string(), - data: transform_data(metric.data.as_any()), - }) - .collect() -} - -fn transform_data(data: &dyn Any) -> Option { - if let Some(sum) = data.downcast_ref::>() { - Some(TonicMetricData::Sum(transform_sum(sum))) - } else if let Some(sum) = data.downcast_ref::>() { - Some(TonicMetricData::Sum(transform_sum(sum))) - } else if let Some(sum) = data.downcast_ref::>() { - Some(TonicMetricData::Sum(transform_sum(sum))) - } else { - global::handle_error(MetricsError::Other("unknown aggregator".into())); - None - } -} - -fn transform_sum + Into + Copy>( - sum: &SdkSum, -) -> TonicSum { - TonicSum { - data_points: sum - .data_points - .iter() - .map(|dp| TonicNumberDataPoint { - attributes: dp.attributes.iter().map(Into::into).collect(), - start_time_unix_nano: dp.start_time.map(to_nanos).unwrap_or_default(), - time_unix_nano: dp.time.map(to_nanos).unwrap_or_default(), - // No support for exemplars - exemplars: Vec::new(), - flags: TonicDataPointFlags::default() as u32, - value: Some(dp.value.into()), - }) - .collect(), - aggregation_temporality: TonicTemporality::from(sum.temporality).into(), - is_monotonic: sum.is_monotonic, - } -} - -fn to_nanos(time: SystemTime) -> u64 { - time.duration_since(UNIX_EPOCH) - .unwrap_or_else(|_| Duration::from_secs(0)) - .as_nanos() as u64 -}