Skip to content

Commit

Permalink
chore: Pipeline metrics (#2179)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing authored Oct 24, 2024
1 parent 02e8260 commit 25c0d32
Show file tree
Hide file tree
Showing 7 changed files with 143 additions and 9 deletions.
6 changes: 4 additions & 2 deletions examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ kind: Pipeline
metadata:
name: simple-pipeline
spec:
limits:
readBatchSize: 1000
vertices:
- name: in
scale:
min: 1
# A self data generating source
source:
generator:
rpu: 50000
msgSize: 1000
rpu: 100
msgSize: 100
duration: 1s

- name: out
Expand Down
8 changes: 7 additions & 1 deletion pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount {
if partitionName == "" {
r.log.Warnf("[vertex name %s, pod name %s]: Partition name is not found for metric %s", vertexName, podName, readTotalMetricName)
} else {
partitionReadCount[partitionName] = ele.Counter.GetValue()
// https://github.com/prometheus/client_rust/issues/194
counterVal := ele.Counter.GetValue()
untypedVal := ele.Untyped.GetValue()
if counterVal == 0 && untypedVal != 0 {
counterVal = untypedVal
}
partitionReadCount[partitionName] = counterVal
}
}
podReadCount := &PodReadCount{podName, partitionReadCount}
Expand Down
1 change: 1 addition & 0 deletions pkg/mvtxdaemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount {
// from the results safely.
// We use Untyped here as the counter metric family shows up as untyped from the rust client
// TODO(MonoVertex): Check further on this to understand why not type is counter
// https://github.com/prometheus/client_rust/issues/194
podReadCount := &PodReadCount{podName, metricsList[0].Untyped.GetValue()}
return podReadCount
} else {
Expand Down
73 changes: 69 additions & 4 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ const MVTX_NAME_LABEL: &str = "mvtx_name";
const REPLICA_LABEL: &str = "mvtx_replica";
const PENDING_PERIOD_LABEL: &str = "period";

const PIPELINE_NAME_LABEL: &str = "pipeline";
const PIPELINE_REPLICA_LABEL: &str = "replica";
const PIPELINE_PARTITION_NAME_LABEL: &str = "partition_name";
const PIPELINE_VERTEX_LABEL: &str = "vertex";
const PIPELINE_VERTEX_TYPE_LABEL: &str = "vertex_type";

// The top-level metric registry is created with the GLOBAL_PREFIX
const GLOBAL_PREFIX: &str = "monovtx";
const MVTX_REGISTRY_GLOBAL_PREFIX: &str = "monovtx";
// Prefixes for the sub-registries
const SINK_REGISTRY_PREFIX: &str = "sink";
const FALLBACK_SINK_REGISTRY_PREFIX: &str = "fallback_sink";
Expand Down Expand Up @@ -67,6 +73,8 @@ const TRANSFORM_TIME: &str = "time";
const ACK_TIME: &str = "ack_time";
const SINK_TIME: &str = "time";

const PIPELINE_FORWARDER_READ_TOTAL: &str = "data_read";

/// Only user defined functions will have containers since rest
/// are builtins. We save the gRPC clients to retrieve metrics and also
/// to do liveness checks.
Expand Down Expand Up @@ -112,7 +120,7 @@ impl GlobalRegistry {
fn new() -> Self {
GlobalRegistry {
// Create a new registry for the metrics
registry: parking_lot::Mutex::new(Registry::with_prefix(GLOBAL_PREFIX)),
registry: parking_lot::Mutex::new(Registry::default()),
}
}
}
Expand Down Expand Up @@ -154,7 +162,9 @@ pub(crate) struct MonoVtxMetrics {

/// PipelineMetrics is a struct which is used for storing the metrics related to the Pipeline
// TODO: Add the metrics for the pipeline
pub(crate) struct PipelineMetrics {}
pub(crate) struct PipelineMetrics {
pub(crate) forwarder: PipelineForwarderMetrics,
}

/// Family of metrics for the sink
pub(crate) struct SinkMetrics {
Expand All @@ -173,6 +183,10 @@ pub(crate) struct TransformerMetrics {
pub(crate) time: Family<Vec<(String, String)>, Histogram>,
}

pub(crate) struct PipelineForwarderMetrics {
pub(crate) data_read: Family<Vec<(String, String)>, Counter>,
}

/// Exponential bucket distribution with range.
/// Creates `length` buckets, where the lowest bucket is `min` and the highest bucket is `max`.
/// The final +Inf bucket is not counted and not included in the returned iterator.
Expand Down Expand Up @@ -235,6 +249,7 @@ impl MonoVtxMetrics {
};

let mut registry = global_registry().registry.lock();
let registry = registry.sub_registry_with_prefix(MVTX_REGISTRY_GLOBAL_PREFIX);
// Register all the metrics to the global registry
registry.register(
READ_TOTAL,
Expand Down Expand Up @@ -314,6 +329,26 @@ impl MonoVtxMetrics {
}
}

impl PipelineMetrics {
fn new() -> Self {
let metrics = Self {
forwarder: PipelineForwarderMetrics {
data_read: Default::default(),
},
};
let mut registry = global_registry().registry.lock();

// Pipeline forwarder sub-registry
let forwarder_registry = registry.sub_registry_with_prefix("forwarder");
forwarder_registry.register(
PIPELINE_FORWARDER_READ_TOTAL,
"Total number of Data Messages Read",
metrics.forwarder.data_read.clone(),
);
metrics
}
}

/// MONOVTX_METRICS is the MonoVtxMetrics object which stores the metrics
static MONOVTX_METRICS: OnceLock<MonoVtxMetrics> = OnceLock::new();

Expand All @@ -329,7 +364,7 @@ static PIPELINE_METRICS: OnceLock<PipelineMetrics> = OnceLock::new();
// forward_pipeline_metrics is a helper function used to fetch the
// PipelineMetrics object
pub(crate) fn forward_pipeline_metrics() -> &'static PipelineMetrics {
PIPELINE_METRICS.get_or_init(|| PipelineMetrics {})
PIPELINE_METRICS.get_or_init(PipelineMetrics::new)
}

/// MONOVTX_METRICS_LABELS are used to store the common labels used in the metrics
Expand All @@ -350,6 +385,32 @@ pub(crate) fn mvtx_forward_metric_labels(
})
}

static PIPELINE_READ_METRICS_LABELS: OnceLock<Vec<(String, String)>> = OnceLock::new();

pub(crate) fn pipeline_forward_read_metric_labels(
pipeline_name: &str,
partition_name: &str,
vertex_name: &str,
vertex_type: &str,
replica: u16,
) -> &'static Vec<(String, String)> {
PIPELINE_READ_METRICS_LABELS.get_or_init(|| {
vec![
(PIPELINE_NAME_LABEL.to_string(), pipeline_name.to_string()),
(PIPELINE_REPLICA_LABEL.to_string(), replica.to_string()),
(
PIPELINE_PARTITION_NAME_LABEL.to_string(),
partition_name.to_string(),
),
(
PIPELINE_VERTEX_TYPE_LABEL.to_string(),
vertex_type.to_string(),
),
(PIPELINE_VERTEX_LABEL.to_string(), vertex_name.to_string()),
]
})
}

// metrics_handler is used to generate and return a snapshot of the
// current state of the metrics in the global registry
pub async fn metrics_handler() -> impl IntoResponse {
Expand All @@ -359,6 +420,10 @@ pub async fn metrics_handler() -> impl IntoResponse {
debug!("Exposing metrics: {:?}", buffer);
Response::builder()
.status(StatusCode::OK)
.header(
axum::http::header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(Body::from(buffer))
.unwrap()
}
Expand Down
3 changes: 2 additions & 1 deletion rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub(crate) async fn start_forwarder(
transformer,
buffer_writers,
cln_token.clone(),
config.clone(),
)
.build();
forwarder.start().await?;
Expand Down Expand Up @@ -197,7 +198,7 @@ async fn create_sink_writer(
}
};
Ok((
SinkWriter::new(config.batch_size, sink_handle).await?,
SinkWriter::new(config.batch_size, sink_handle, config.clone()).await?,
sink_grpc_client,
))
}
Expand Down
20 changes: 20 additions & 0 deletions rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::config::pipeline::PipelineConfig;
use crate::error;
use crate::error::Error;
use crate::message::{Message, Offset};
use crate::metrics::{forward_pipeline_metrics, pipeline_forward_read_metric_labels};
use crate::pipeline::isb::jetstream::WriterHandle;
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;
Expand All @@ -16,13 +18,15 @@ pub(crate) struct Forwarder {
transformer: Option<SourceTransformHandle>,
buffer_writers: HashMap<String, Vec<WriterHandle>>,
cln_token: CancellationToken,
config: PipelineConfig,
}

pub(crate) struct ForwarderBuilder {
source_reader: SourceHandle,
transformer: Option<SourceTransformHandle>,
buffer_writers: HashMap<String, Vec<WriterHandle>>,
cln_token: CancellationToken,
config: PipelineConfig,
}

impl ForwarderBuilder {
Expand All @@ -31,12 +35,14 @@ impl ForwarderBuilder {
transformer: Option<SourceTransformHandle>,
buffer_writers: HashMap<String, Vec<WriterHandle>>,
cln_token: CancellationToken,
config: PipelineConfig,
) -> Self {
Self {
source_reader,
transformer,
buffer_writers,
cln_token,
config,
}
}

Expand All @@ -46,6 +52,7 @@ impl ForwarderBuilder {
transformer: self.transformer,
buffer_writers: self.buffer_writers,
cln_token: self.cln_token,
config: self.config,
}
}
}
Expand Down Expand Up @@ -87,6 +94,19 @@ impl Forwarder {
start_time.elapsed().as_millis()
);

let labels = pipeline_forward_read_metric_labels(
self.config.pipeline_name.as_ref(),
self.config.vertex_name.as_ref(),
self.config.vertex_name.as_ref(),
"Source",
self.config.replica,
);
forward_pipeline_metrics()
.forwarder
.data_read
.get_or_create(labels)
.inc_by(messages.len() as u64);

if messages.is_empty() {
return Ok(0);
}
Expand Down
41 changes: 40 additions & 1 deletion rust/numaflow-core/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::config::pipeline::PipelineConfig;
use crate::metrics::{forward_pipeline_metrics, pipeline_forward_read_metric_labels};
use crate::Result;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -123,13 +125,19 @@ impl SinkHandle {
pub(super) struct SinkWriter {
batch_size: usize,
sink_handle: SinkHandle,
config: PipelineConfig,
}

impl SinkWriter {
pub(super) async fn new(batch_size: usize, sink_handle: SinkHandle) -> Result<Self> {
pub(super) async fn new(
batch_size: usize,
sink_handle: SinkHandle,
config: PipelineConfig,
) -> Result<Self> {
Ok(Self {
batch_size,
sink_handle,
config,
})
}

Expand All @@ -139,12 +147,43 @@ impl SinkWriter {
) -> Result<JoinHandle<()>> {
let handle: JoinHandle<()> = tokio::spawn({
let this = self.clone();

// let partition_name = format!(
// "default-{}-{}-{}",
// self.config.pipeline_name, self.config.vertex_name, self.config.replica
// );

// FIXME:
let partition: &str = self
.config
.from_vertex_config
.first()
.unwrap()
.reader_config
.streams
.first()
.unwrap()
.0
.as_ref();

let labels = pipeline_forward_read_metric_labels(
self.config.pipeline_name.as_ref(),
partition,
self.config.vertex_name.as_ref(),
"Sink",
self.config.replica,
);
async move {
loop {
let mut batch = Vec::with_capacity(this.batch_size);
for _ in 0..this.batch_size {
if let Some(read_message) = messages_rx.recv().await {
batch.push(read_message);
forward_pipeline_metrics()
.forwarder
.data_read
.get_or_create(labels)
.inc();
}
}

Expand Down

0 comments on commit 25c0d32

Please sign in to comment.