diff --git a/examples/1-simple-pipeline.yaml b/examples/1-simple-pipeline.yaml index 8e16389339..77c64533de 100644 --- a/examples/1-simple-pipeline.yaml +++ b/examples/1-simple-pipeline.yaml @@ -3,6 +3,8 @@ kind: Pipeline metadata: name: simple-pipeline spec: + limits: + readBatchSize: 1000 vertices: - name: in scale: @@ -10,8 +12,8 @@ spec: # A self data generating source source: generator: - rpu: 50000 - msgSize: 1000 + rpu: 100 + msgSize: 100 duration: 1s - name: out diff --git a/pkg/daemon/server/service/rater/rater.go b/pkg/daemon/server/service/rater/rater.go index 86b2ad10de..94e7bc9875 100644 --- a/pkg/daemon/server/service/rater/rater.go +++ b/pkg/daemon/server/service/rater/rater.go @@ -248,7 +248,13 @@ func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount { if partitionName == "" { r.log.Warnf("[vertex name %s, pod name %s]: Partition name is not found for metric %s", vertexName, podName, readTotalMetricName) } else { - partitionReadCount[partitionName] = ele.Counter.GetValue() + // https://github.com/prometheus/client_rust/issues/194 + counterVal := ele.Counter.GetValue() + untypedVal := ele.Untyped.GetValue() + if counterVal == 0 && untypedVal != 0 { + counterVal = untypedVal + } + partitionReadCount[partitionName] = counterVal } } podReadCount := &PodReadCount{podName, partitionReadCount} diff --git a/pkg/mvtxdaemon/server/service/rater/rater.go b/pkg/mvtxdaemon/server/service/rater/rater.go index 19a3ee87dc..ac7422952e 100644 --- a/pkg/mvtxdaemon/server/service/rater/rater.go +++ b/pkg/mvtxdaemon/server/service/rater/rater.go @@ -179,6 +179,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount { // from the results safely. // We use Untyped here as the counter metric family shows up as untyped from the rust client // TODO(MonoVertex): Check further on this to understand why not type is counter + // https://github.com/prometheus/client_rust/issues/194 podReadCount := &PodReadCount{podName, metricsList[0].Untyped.GetValue()} return podReadCount } else { diff --git a/rust/numaflow-core/src/metrics.rs b/rust/numaflow-core/src/metrics.rs index 26a85fa8ed..1022d778d9 100644 --- a/rust/numaflow-core/src/metrics.rs +++ b/rust/numaflow-core/src/metrics.rs @@ -36,8 +36,14 @@ const MVTX_NAME_LABEL: &str = "mvtx_name"; const REPLICA_LABEL: &str = "mvtx_replica"; const PENDING_PERIOD_LABEL: &str = "period"; +const PIPELINE_NAME_LABEL: &str = "pipeline"; +const PIPELINE_REPLICA_LABEL: &str = "replica"; +const PIPELINE_PARTITION_NAME_LABEL: &str = "partition_name"; +const PIPELINE_VERTEX_LABEL: &str = "vertex"; +const PIPELINE_VERTEX_TYPE_LABEL: &str = "vertex_type"; + // The top-level metric registry is created with the GLOBAL_PREFIX -const GLOBAL_PREFIX: &str = "monovtx"; +const MVTX_REGISTRY_GLOBAL_PREFIX: &str = "monovtx"; // Prefixes for the sub-registries const SINK_REGISTRY_PREFIX: &str = "sink"; const FALLBACK_SINK_REGISTRY_PREFIX: &str = "fallback_sink"; @@ -67,6 +73,8 @@ const TRANSFORM_TIME: &str = "time"; const ACK_TIME: &str = "ack_time"; const SINK_TIME: &str = "time"; +const PIPELINE_FORWARDER_READ_TOTAL: &str = "data_read"; + /// Only user defined functions will have containers since rest /// are builtins. We save the gRPC clients to retrieve metrics and also /// to do liveness checks. @@ -112,7 +120,7 @@ impl GlobalRegistry { fn new() -> Self { GlobalRegistry { // Create a new registry for the metrics - registry: parking_lot::Mutex::new(Registry::with_prefix(GLOBAL_PREFIX)), + registry: parking_lot::Mutex::new(Registry::default()), } } } @@ -154,7 +162,9 @@ pub(crate) struct MonoVtxMetrics { /// PipelineMetrics is a struct which is used for storing the metrics related to the Pipeline // TODO: Add the metrics for the pipeline -pub(crate) struct PipelineMetrics {} +pub(crate) struct PipelineMetrics { + pub(crate) forwarder: PipelineForwarderMetrics, +} /// Family of metrics for the sink pub(crate) struct SinkMetrics { @@ -173,6 +183,10 @@ pub(crate) struct TransformerMetrics { pub(crate) time: Family, Histogram>, } +pub(crate) struct PipelineForwarderMetrics { + pub(crate) data_read: Family, Counter>, +} + /// Exponential bucket distribution with range. /// Creates `length` buckets, where the lowest bucket is `min` and the highest bucket is `max`. /// The final +Inf bucket is not counted and not included in the returned iterator. @@ -235,6 +249,7 @@ impl MonoVtxMetrics { }; let mut registry = global_registry().registry.lock(); + let registry = registry.sub_registry_with_prefix(MVTX_REGISTRY_GLOBAL_PREFIX); // Register all the metrics to the global registry registry.register( READ_TOTAL, @@ -314,6 +329,26 @@ impl MonoVtxMetrics { } } +impl PipelineMetrics { + fn new() -> Self { + let metrics = Self { + forwarder: PipelineForwarderMetrics { + data_read: Default::default(), + }, + }; + let mut registry = global_registry().registry.lock(); + + // Pipeline forwarder sub-registry + let forwarder_registry = registry.sub_registry_with_prefix("forwarder"); + forwarder_registry.register( + PIPELINE_FORWARDER_READ_TOTAL, + "Total number of Data Messages Read", + metrics.forwarder.data_read.clone(), + ); + metrics + } +} + /// MONOVTX_METRICS is the MonoVtxMetrics object which stores the metrics static MONOVTX_METRICS: OnceLock = OnceLock::new(); @@ -329,7 +364,7 @@ static PIPELINE_METRICS: OnceLock = OnceLock::new(); // forward_pipeline_metrics is a helper function used to fetch the // PipelineMetrics object pub(crate) fn forward_pipeline_metrics() -> &'static PipelineMetrics { - PIPELINE_METRICS.get_or_init(|| PipelineMetrics {}) + PIPELINE_METRICS.get_or_init(PipelineMetrics::new) } /// MONOVTX_METRICS_LABELS are used to store the common labels used in the metrics @@ -350,6 +385,32 @@ pub(crate) fn mvtx_forward_metric_labels( }) } +static PIPELINE_READ_METRICS_LABELS: OnceLock> = OnceLock::new(); + +pub(crate) fn pipeline_forward_read_metric_labels( + pipeline_name: &str, + partition_name: &str, + vertex_name: &str, + vertex_type: &str, + replica: u16, +) -> &'static Vec<(String, String)> { + PIPELINE_READ_METRICS_LABELS.get_or_init(|| { + vec![ + (PIPELINE_NAME_LABEL.to_string(), pipeline_name.to_string()), + (PIPELINE_REPLICA_LABEL.to_string(), replica.to_string()), + ( + PIPELINE_PARTITION_NAME_LABEL.to_string(), + partition_name.to_string(), + ), + ( + PIPELINE_VERTEX_TYPE_LABEL.to_string(), + vertex_type.to_string(), + ), + (PIPELINE_VERTEX_LABEL.to_string(), vertex_name.to_string()), + ] + }) +} + // metrics_handler is used to generate and return a snapshot of the // current state of the metrics in the global registry pub async fn metrics_handler() -> impl IntoResponse { @@ -359,6 +420,10 @@ pub async fn metrics_handler() -> impl IntoResponse { debug!("Exposing metrics: {:?}", buffer); Response::builder() .status(StatusCode::OK) + .header( + axum::http::header::CONTENT_TYPE, + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ) .body(Body::from(buffer)) .unwrap() } diff --git a/rust/numaflow-core/src/pipeline.rs b/rust/numaflow-core/src/pipeline.rs index 82e40fe389..669b5bd531 100644 --- a/rust/numaflow-core/src/pipeline.rs +++ b/rust/numaflow-core/src/pipeline.rs @@ -59,6 +59,7 @@ pub(crate) async fn start_forwarder( transformer, buffer_writers, cln_token.clone(), + config.clone(), ) .build(); forwarder.start().await?; @@ -197,7 +198,7 @@ async fn create_sink_writer( } }; Ok(( - SinkWriter::new(config.batch_size, sink_handle).await?, + SinkWriter::new(config.batch_size, sink_handle, config.clone()).await?, sink_grpc_client, )) } diff --git a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs index cdebcd3916..c4acc66c95 100644 --- a/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs +++ b/rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs @@ -1,6 +1,8 @@ +use crate::config::pipeline::PipelineConfig; use crate::error; use crate::error::Error; use crate::message::{Message, Offset}; +use crate::metrics::{forward_pipeline_metrics, pipeline_forward_read_metric_labels}; use crate::pipeline::isb::jetstream::WriterHandle; use crate::source::SourceHandle; use crate::transformer::user_defined::SourceTransformHandle; @@ -16,6 +18,7 @@ pub(crate) struct Forwarder { transformer: Option, buffer_writers: HashMap>, cln_token: CancellationToken, + config: PipelineConfig, } pub(crate) struct ForwarderBuilder { @@ -23,6 +26,7 @@ pub(crate) struct ForwarderBuilder { transformer: Option, buffer_writers: HashMap>, cln_token: CancellationToken, + config: PipelineConfig, } impl ForwarderBuilder { @@ -31,12 +35,14 @@ impl ForwarderBuilder { transformer: Option, buffer_writers: HashMap>, cln_token: CancellationToken, + config: PipelineConfig, ) -> Self { Self { source_reader, transformer, buffer_writers, cln_token, + config, } } @@ -46,6 +52,7 @@ impl ForwarderBuilder { transformer: self.transformer, buffer_writers: self.buffer_writers, cln_token: self.cln_token, + config: self.config, } } } @@ -87,6 +94,19 @@ impl Forwarder { start_time.elapsed().as_millis() ); + let labels = pipeline_forward_read_metric_labels( + self.config.pipeline_name.as_ref(), + self.config.vertex_name.as_ref(), + self.config.vertex_name.as_ref(), + "Source", + self.config.replica, + ); + forward_pipeline_metrics() + .forwarder + .data_read + .get_or_create(labels) + .inc_by(messages.len() as u64); + if messages.is_empty() { return Ok(0); } diff --git a/rust/numaflow-core/src/sink.rs b/rust/numaflow-core/src/sink.rs index 5c87f15979..71f2772872 100644 --- a/rust/numaflow-core/src/sink.rs +++ b/rust/numaflow-core/src/sink.rs @@ -1,3 +1,5 @@ +use crate::config::pipeline::PipelineConfig; +use crate::metrics::{forward_pipeline_metrics, pipeline_forward_read_metric_labels}; use crate::Result; use numaflow_pb::clients::sink::sink_client::SinkClient; use tokio::sync::mpsc::Receiver; @@ -123,13 +125,19 @@ impl SinkHandle { pub(super) struct SinkWriter { batch_size: usize, sink_handle: SinkHandle, + config: PipelineConfig, } impl SinkWriter { - pub(super) async fn new(batch_size: usize, sink_handle: SinkHandle) -> Result { + pub(super) async fn new( + batch_size: usize, + sink_handle: SinkHandle, + config: PipelineConfig, + ) -> Result { Ok(Self { batch_size, sink_handle, + config, }) } @@ -139,12 +147,43 @@ impl SinkWriter { ) -> Result> { let handle: JoinHandle<()> = tokio::spawn({ let this = self.clone(); + + // let partition_name = format!( + // "default-{}-{}-{}", + // self.config.pipeline_name, self.config.vertex_name, self.config.replica + // ); + + // FIXME: + let partition: &str = self + .config + .from_vertex_config + .first() + .unwrap() + .reader_config + .streams + .first() + .unwrap() + .0 + .as_ref(); + + let labels = pipeline_forward_read_metric_labels( + self.config.pipeline_name.as_ref(), + partition, + self.config.vertex_name.as_ref(), + "Sink", + self.config.replica, + ); async move { loop { let mut batch = Vec::with_capacity(this.batch_size); for _ in 0..this.batch_size { if let Some(read_message) = messages_rx.recv().await { batch.push(read_message); + forward_pipeline_metrics() + .forwarder + .data_read + .get_or_create(labels) + .inc(); } }