Skip to content
This repository has been archived by the owner on Apr 18, 2023. It is now read-only.

Add server streaming message receive and send histograms #94

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,21 @@ func EnableHandlingTimeHistogram(opts ...HistogramOption) {
DefaultServerMetrics.EnableHandlingTimeHistogram(opts...)
prom.Register(DefaultServerMetrics.serverHandledHistogram)
}

// EnableServerStreamReceiveTimeHistogram turns on recording of
// single message receive time of streaming RPCs.
// This function acts on the DefaultServerMetrics variable and the
// default Prometheus metrics registry.
func EnableServerStreamReceiveTimeHistogram(opts ...HistogramOption) {
DefaultServerMetrics.EnableServerStreamReceiveTimeHistogram(opts...)
prom.Register(DefaultServerMetrics.serverStreamRecvHistogram)
}

// EnableServerStreamSendTimeHistogram turns on recording of
// single message send time of streaming RPCs.
// This function acts on the DefaultServerMetrics variable and the
// default Prometheus metrics registry.
func EnableServerStreamSendTimeHistogram(opts ...HistogramOption) {
DefaultServerMetrics.EnableServerStreamSendTimeHistogram(opts...)
prom.Register(DefaultServerMetrics.serverStreamSendHistogram)
}
75 changes: 74 additions & 1 deletion server_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package grpc_prometheus

import (
"context"

"github.com/grpc-ecosystem/go-grpc-prometheus/packages/grpcstatus"
prom "github.com/prometheus/client_golang/prometheus"

Expand All @@ -18,6 +19,14 @@ type ServerMetrics struct {
serverHandledHistogramEnabled bool
serverHandledHistogramOpts prom.HistogramOpts
serverHandledHistogram *prom.HistogramVec

serverStreamRecvHistogramEnabled bool
serverStreamRecvHistogramOpts prom.HistogramOpts
serverStreamRecvHistogram *prom.HistogramVec

serverStreamSendHistogramEnabled bool
serverStreamSendHistogramOpts prom.HistogramOpts
serverStreamSendHistogram *prom.HistogramVec
}

// NewServerMetrics returns a ServerMetrics object. Use a new instance of
Expand Down Expand Up @@ -53,7 +62,21 @@ func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics {
Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.",
Buckets: prom.DefBuckets,
},
serverHandledHistogram: nil,
serverHandledHistogram: nil,
serverStreamRecvHistogramEnabled: false,
serverStreamRecvHistogramOpts: prom.HistogramOpts{
Name: "grpc_server_msg_recv_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message receive.",
Buckets: prom.DefBuckets,
},
serverStreamRecvHistogram: nil,
serverStreamSendHistogramEnabled: false,
serverStreamSendHistogramOpts: prom.HistogramOpts{
Name: "grpc_server_msg_send_handling_seconds",
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
Buckets: prom.DefBuckets,
},
serverStreamSendHistogram: nil,
}
}

Expand All @@ -74,6 +97,40 @@ func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) {
m.serverHandledHistogramEnabled = true
}

// EnableServerStreamReceiveTimeHistogram turns on recording of single message receive time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ServerMetrics) EnableServerStreamReceiveTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.serverStreamRecvHistogramOpts)
}

if !m.serverStreamRecvHistogramEnabled {
m.serverStreamRecvHistogram = prom.NewHistogramVec(
m.serverStreamRecvHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}

m.serverStreamRecvHistogramEnabled = true
}

// EnableServerStreamSendTimeHistogram turns on recording of single message send time of streaming RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query.
func (m *ServerMetrics) EnableServerStreamSendTimeHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.serverStreamSendHistogramOpts)
}

if !m.serverStreamSendHistogramEnabled {
m.serverStreamSendHistogram = prom.NewHistogramVec(
m.serverStreamSendHistogramOpts,
[]string{"grpc_type", "grpc_service", "grpc_method"},
)
}

m.serverStreamSendHistogramEnabled = true
}

// Describe sends the super-set of all possible descriptors of metrics
// collected by this Collector to the provided channel and returns once
// the last descriptor has been sent.
Expand All @@ -85,6 +142,12 @@ func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) {
if m.serverHandledHistogramEnabled {
m.serverHandledHistogram.Describe(ch)
}
if m.serverStreamRecvHistogramEnabled {
m.serverStreamRecvHistogram.Describe(ch)
}
if m.serverStreamSendHistogramEnabled {
m.serverStreamSendHistogram.Describe(ch)
}
}

// Collect is called by the Prometheus registry when collecting
Expand All @@ -98,6 +161,12 @@ func (m *ServerMetrics) Collect(ch chan<- prom.Metric) {
if m.serverHandledHistogramEnabled {
m.serverHandledHistogram.Collect(ch)
}
if m.serverStreamRecvHistogramEnabled {
m.serverStreamRecvHistogram.Collect(ch)
}
if m.serverStreamSendHistogramEnabled {
m.serverStreamSendHistogram.Collect(ch)
}
}

// UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs.
Expand Down Expand Up @@ -154,15 +223,19 @@ type monitoredServerStream struct {
}

func (s *monitoredServerStream) SendMsg(m interface{}) error {
timer := s.monitor.SendMessageTimer()
err := s.ServerStream.SendMsg(m)
timer.ObserveDuration()
if err == nil {
s.monitor.SentMessage()
}
return err
}

func (s *monitoredServerStream) RecvMsg(m interface{}) error {
timer := s.monitor.ReceiveMessageTimer()
err := s.ServerStream.RecvMsg(m)
timer.ObserveDuration()
if err == nil {
s.monitor.ReceivedMessage()
}
Expand Down
19 changes: 19 additions & 0 deletions server_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package grpc_prometheus
import (
"time"

"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/codes"
)

Expand All @@ -30,10 +31,28 @@ func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *s
return r
}

func (r *serverReporter) ReceiveMessageTimer() timer {
if r.metrics.serverStreamRecvHistogramEnabled {
hist := r.metrics.serverStreamRecvHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
return prometheus.NewTimer(hist)
}

return emptyTimer
}

func (r *serverReporter) ReceivedMessage() {
r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

func (r *serverReporter) SendMessageTimer() timer {
if r.metrics.serverStreamSendHistogramEnabled {
hist := r.metrics.serverStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
return prometheus.NewTimer(hist)
}

return emptyTimer
}

func (r *serverReporter) SentMessage() {
r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}
Expand Down