Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Pipeline metrics #2179

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions examples/1-simple-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,17 @@ kind: Pipeline
metadata:
name: simple-pipeline
spec:
limits:
readBatchSize: 1000
vertices:
- name: in
scale:
min: 1
# A self data generating source
source:
generator:
rpu: 50000
msgSize: 1000
rpu: 100
msgSize: 100
duration: 1s

- name: out
Expand Down
8 changes: 7 additions & 1 deletion pkg/daemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,13 @@ func (r *Rater) getPodReadCounts(vertexName, podName string) *PodReadCount {
if partitionName == "" {
r.log.Warnf("[vertex name %s, pod name %s]: Partition name is not found for metric %s", vertexName, podName, readTotalMetricName)
} else {
partitionReadCount[partitionName] = ele.Counter.GetValue()
// https://github.com/prometheus/client_rust/issues/194
counterVal := ele.Counter.GetValue()
untypedVal := ele.Untyped.GetValue()
if counterVal == 0 && untypedVal != 0 {
counterVal = untypedVal
}
partitionReadCount[partitionName] = counterVal
}
}
podReadCount := &PodReadCount{podName, partitionReadCount}
Expand Down
1 change: 1 addition & 0 deletions pkg/mvtxdaemon/server/service/rater/rater.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ func (r *Rater) getPodReadCounts(podName string) *PodReadCount {
// from the results safely.
// We use Untyped here as the counter metric family shows up as untyped from the rust client
// TODO(MonoVertex): Check further on this to understand why not type is counter
// https://github.com/prometheus/client_rust/issues/194
podReadCount := &PodReadCount{podName, metricsList[0].Untyped.GetValue()}
return podReadCount
} else {
Expand Down
73 changes: 69 additions & 4 deletions rust/numaflow-core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,14 @@ const MVTX_NAME_LABEL: &str = "mvtx_name";
const REPLICA_LABEL: &str = "mvtx_replica";
const PENDING_PERIOD_LABEL: &str = "period";

const PIPELINE_NAME_LABEL: &str = "pipeline";
const PIPELINE_REPLICA_LABEL: &str = "replica";
const PIPELINE_PARTITION_NAME_LABEL: &str = "partition_name";
const PIPELINE_VERTEX_LABEL: &str = "vertex";
const PIPELINE_VERTEX_TYPE_LABEL: &str = "vertex_type";

// The top-level metric registry is created with the GLOBAL_PREFIX
const GLOBAL_PREFIX: &str = "monovtx";
const MVTX_REGISTRY_GLOBAL_PREFIX: &str = "monovtx";
// Prefixes for the sub-registries
const SINK_REGISTRY_PREFIX: &str = "sink";
const FALLBACK_SINK_REGISTRY_PREFIX: &str = "fallback_sink";
Expand Down Expand Up @@ -67,6 +73,8 @@ const TRANSFORM_TIME: &str = "time";
const ACK_TIME: &str = "ack_time";
const SINK_TIME: &str = "time";

const PIPELINE_FORWARDER_READ_TOTAL: &str = "data_read";

/// Only user defined functions will have containers since rest
/// are builtins. We save the gRPC clients to retrieve metrics and also
/// to do liveness checks.
Expand Down Expand Up @@ -112,7 +120,7 @@ impl GlobalRegistry {
fn new() -> Self {
GlobalRegistry {
// Create a new registry for the metrics
registry: parking_lot::Mutex::new(Registry::with_prefix(GLOBAL_PREFIX)),
registry: parking_lot::Mutex::new(Registry::default()),
}
}
}
Expand Down Expand Up @@ -154,7 +162,9 @@ pub(crate) struct MonoVtxMetrics {

/// PipelineMetrics is a struct which is used for storing the metrics related to the Pipeline
// TODO: Add the metrics for the pipeline
pub(crate) struct PipelineMetrics {}
pub(crate) struct PipelineMetrics {
pub(crate) forwarder: PipelineForwarderMetrics,
}

/// Family of metrics for the sink
pub(crate) struct SinkMetrics {
Expand All @@ -173,6 +183,10 @@ pub(crate) struct TransformerMetrics {
pub(crate) time: Family<Vec<(String, String)>, Histogram>,
}

pub(crate) struct PipelineForwarderMetrics {
pub(crate) data_read: Family<Vec<(String, String)>, Counter>,
}

/// Exponential bucket distribution with range.
/// Creates `length` buckets, where the lowest bucket is `min` and the highest bucket is `max`.
/// The final +Inf bucket is not counted and not included in the returned iterator.
Expand Down Expand Up @@ -235,6 +249,7 @@ impl MonoVtxMetrics {
};

let mut registry = global_registry().registry.lock();
let registry = registry.sub_registry_with_prefix(MVTX_REGISTRY_GLOBAL_PREFIX);
// Register all the metrics to the global registry
registry.register(
READ_TOTAL,
Expand Down Expand Up @@ -314,6 +329,26 @@ impl MonoVtxMetrics {
}
}

impl PipelineMetrics {
fn new() -> Self {
let metrics = Self {
forwarder: PipelineForwarderMetrics {
data_read: Default::default(),
},
};
let mut registry = global_registry().registry.lock();

// Pipeline forwarder sub-registry
let forwarder_registry = registry.sub_registry_with_prefix("forwarder");
forwarder_registry.register(
PIPELINE_FORWARDER_READ_TOTAL,
"Total number of Data Messages Read",
metrics.forwarder.data_read.clone(),
);
metrics
}
}

/// MONOVTX_METRICS is the MonoVtxMetrics object which stores the metrics
static MONOVTX_METRICS: OnceLock<MonoVtxMetrics> = OnceLock::new();

Expand All @@ -329,7 +364,7 @@ static PIPELINE_METRICS: OnceLock<PipelineMetrics> = OnceLock::new();
// forward_pipeline_metrics is a helper function used to fetch the
// PipelineMetrics object
pub(crate) fn forward_pipeline_metrics() -> &'static PipelineMetrics {
PIPELINE_METRICS.get_or_init(|| PipelineMetrics {})
PIPELINE_METRICS.get_or_init(PipelineMetrics::new)
}

/// MONOVTX_METRICS_LABELS are used to store the common labels used in the metrics
Expand All @@ -350,6 +385,32 @@ pub(crate) fn mvtx_forward_metric_labels(
})
}

static PIPELINE_READ_METRICS_LABELS: OnceLock<Vec<(String, String)>> = OnceLock::new();

pub(crate) fn pipeline_forward_read_metric_labels(
pipeline_name: &str,
partition_name: &str,
vertex_name: &str,
vertex_type: &str,
replica: u16,
) -> &'static Vec<(String, String)> {
PIPELINE_READ_METRICS_LABELS.get_or_init(|| {
vec![
(PIPELINE_NAME_LABEL.to_string(), pipeline_name.to_string()),
(PIPELINE_REPLICA_LABEL.to_string(), replica.to_string()),
(
PIPELINE_PARTITION_NAME_LABEL.to_string(),
partition_name.to_string(),
),
(
PIPELINE_VERTEX_TYPE_LABEL.to_string(),
vertex_type.to_string(),
),
(PIPELINE_VERTEX_LABEL.to_string(), vertex_name.to_string()),
]
})
}

// metrics_handler is used to generate and return a snapshot of the
// current state of the metrics in the global registry
pub async fn metrics_handler() -> impl IntoResponse {
Expand All @@ -359,6 +420,10 @@ pub async fn metrics_handler() -> impl IntoResponse {
debug!("Exposing metrics: {:?}", buffer);
Response::builder()
.status(StatusCode::OK)
.header(
axum::http::header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
)
.body(Body::from(buffer))
.unwrap()
}
Expand Down
3 changes: 2 additions & 1 deletion rust/numaflow-core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub(crate) async fn start_forwarder(
transformer,
buffer_writers,
cln_token.clone(),
config.clone(),
)
.build();
forwarder.start().await?;
Expand Down Expand Up @@ -197,7 +198,7 @@ async fn create_sink_writer(
}
};
Ok((
SinkWriter::new(config.batch_size, sink_handle).await?,
SinkWriter::new(config.batch_size, sink_handle, config.clone()).await?,
sink_grpc_client,
))
}
Expand Down
20 changes: 20 additions & 0 deletions rust/numaflow-core/src/pipeline/forwarder/source_forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::config::pipeline::PipelineConfig;
use crate::error;
use crate::error::Error;
use crate::message::{Message, Offset};
use crate::metrics::{forward_pipeline_metrics, pipeline_forward_read_metric_labels};
use crate::pipeline::isb::jetstream::WriterHandle;
use crate::source::SourceHandle;
use crate::transformer::user_defined::SourceTransformHandle;
Expand All @@ -16,13 +18,15 @@ pub(crate) struct Forwarder {
transformer: Option<SourceTransformHandle>,
buffer_writers: HashMap<String, Vec<WriterHandle>>,
cln_token: CancellationToken,
config: PipelineConfig,
}

pub(crate) struct ForwarderBuilder {
source_reader: SourceHandle,
transformer: Option<SourceTransformHandle>,
buffer_writers: HashMap<String, Vec<WriterHandle>>,
cln_token: CancellationToken,
config: PipelineConfig,
}

impl ForwarderBuilder {
Expand All @@ -31,12 +35,14 @@ impl ForwarderBuilder {
transformer: Option<SourceTransformHandle>,
buffer_writers: HashMap<String, Vec<WriterHandle>>,
cln_token: CancellationToken,
config: PipelineConfig,
) -> Self {
Self {
source_reader,
transformer,
buffer_writers,
cln_token,
config,
}
}

Expand All @@ -46,6 +52,7 @@ impl ForwarderBuilder {
transformer: self.transformer,
buffer_writers: self.buffer_writers,
cln_token: self.cln_token,
config: self.config,
}
}
}
Expand Down Expand Up @@ -87,6 +94,19 @@ impl Forwarder {
start_time.elapsed().as_millis()
);

let labels = pipeline_forward_read_metric_labels(
self.config.pipeline_name.as_ref(),
self.config.vertex_name.as_ref(),
self.config.vertex_name.as_ref(),
"Source",
self.config.replica,
);
forward_pipeline_metrics()
.forwarder
.data_read
.get_or_create(labels)
.inc_by(messages.len() as u64);

if messages.is_empty() {
return Ok(0);
}
Expand Down
41 changes: 40 additions & 1 deletion rust/numaflow-core/src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::config::pipeline::PipelineConfig;
use crate::metrics::{forward_pipeline_metrics, pipeline_forward_read_metric_labels};
use crate::Result;
use numaflow_pb::clients::sink::sink_client::SinkClient;
use tokio::sync::mpsc::Receiver;
Expand Down Expand Up @@ -123,13 +125,19 @@ impl SinkHandle {
pub(super) struct SinkWriter {
batch_size: usize,
sink_handle: SinkHandle,
config: PipelineConfig,
}

impl SinkWriter {
pub(super) async fn new(batch_size: usize, sink_handle: SinkHandle) -> Result<Self> {
pub(super) async fn new(
batch_size: usize,
sink_handle: SinkHandle,
config: PipelineConfig,
) -> Result<Self> {
Ok(Self {
batch_size,
sink_handle,
config,
})
}

Expand All @@ -139,12 +147,43 @@ impl SinkWriter {
) -> Result<JoinHandle<()>> {
let handle: JoinHandle<()> = tokio::spawn({
let this = self.clone();

// let partition_name = format!(
// "default-{}-{}-{}",
// self.config.pipeline_name, self.config.vertex_name, self.config.replica
// );

// FIXME:
let partition: &str = self
.config
.from_vertex_config
.first()
.unwrap()
.reader_config
.streams
.first()
.unwrap()
.0
.as_ref();

let labels = pipeline_forward_read_metric_labels(
self.config.pipeline_name.as_ref(),
partition,
self.config.vertex_name.as_ref(),
"Sink",
self.config.replica,
);
async move {
loop {
let mut batch = Vec::with_capacity(this.batch_size);
for _ in 0..this.batch_size {
if let Some(read_message) = messages_rx.recv().await {
batch.push(read_message);
forward_pipeline_metrics()
.forwarder
.data_read
.get_or_create(labels)
.inc();
}
}

Expand Down