diff --git a/server.go b/server.go index 322f990..3a532ee 100644 --- a/server.go +++ b/server.go @@ -46,3 +46,21 @@ func EnableHandlingTimeHistogram(opts ...HistogramOption) { DefaultServerMetrics.EnableHandlingTimeHistogram(opts...) prom.Register(DefaultServerMetrics.serverHandledHistogram) } + +// EnableServerStreamReceiveTimeHistogram turns on recording of +// single message receive time of streaming RPCs. +// This function acts on the DefaultServerMetrics variable and the +// default Prometheus metrics registry. +func EnableServerStreamReceiveTimeHistogram(opts ...HistogramOption) { + DefaultServerMetrics.EnableServerStreamReceiveTimeHistogram(opts...) + prom.Register(DefaultServerMetrics.serverStreamRecvHistogram) +} + +// EnableServerStreamSendTimeHistogram turns on recording of +// single message send time of streaming RPCs. +// This function acts on the DefaultServerMetrics variable and the +// default Prometheus metrics registry. +func EnableServerStreamSendTimeHistogram(opts ...HistogramOption) { + DefaultServerMetrics.EnableServerStreamSendTimeHistogram(opts...) + prom.Register(DefaultServerMetrics.serverStreamSendHistogram) +} diff --git a/server_metrics.go b/server_metrics.go index d28a46e..50f587b 100644 --- a/server_metrics.go +++ b/server_metrics.go @@ -2,6 +2,7 @@ package grpc_prometheus import ( "context" + "github.com/grpc-ecosystem/go-grpc-prometheus/packages/grpcstatus" prom "github.com/prometheus/client_golang/prometheus" @@ -18,6 +19,14 @@ type ServerMetrics struct { serverHandledHistogramEnabled bool serverHandledHistogramOpts prom.HistogramOpts serverHandledHistogram *prom.HistogramVec + + serverStreamRecvHistogramEnabled bool + serverStreamRecvHistogramOpts prom.HistogramOpts + serverStreamRecvHistogram *prom.HistogramVec + + serverStreamSendHistogramEnabled bool + serverStreamSendHistogramOpts prom.HistogramOpts + serverStreamSendHistogram *prom.HistogramVec } // NewServerMetrics returns a ServerMetrics object. Use a new instance of @@ -53,7 +62,21 @@ func NewServerMetrics(counterOpts ...CounterOption) *ServerMetrics { Help: "Histogram of response latency (seconds) of gRPC that had been application-level handled by the server.", Buckets: prom.DefBuckets, }, - serverHandledHistogram: nil, + serverHandledHistogram: nil, + serverStreamRecvHistogramEnabled: false, + serverStreamRecvHistogramOpts: prom.HistogramOpts{ + Name: "grpc_server_msg_recv_handling_seconds", + Help: "Histogram of response latency (seconds) of the gRPC single message receive.", + Buckets: prom.DefBuckets, + }, + serverStreamRecvHistogram: nil, + serverStreamSendHistogramEnabled: false, + serverStreamSendHistogramOpts: prom.HistogramOpts{ + Name: "grpc_server_msg_send_handling_seconds", + Help: "Histogram of response latency (seconds) of the gRPC single message send.", + Buckets: prom.DefBuckets, + }, + serverStreamSendHistogram: nil, } } @@ -74,6 +97,40 @@ func (m *ServerMetrics) EnableHandlingTimeHistogram(opts ...HistogramOption) { m.serverHandledHistogramEnabled = true } +// EnableServerStreamReceiveTimeHistogram turns on recording of single message receive time of streaming RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. +func (m *ServerMetrics) EnableServerStreamReceiveTimeHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&m.serverStreamRecvHistogramOpts) + } + + if !m.serverStreamRecvHistogramEnabled { + m.serverStreamRecvHistogram = prom.NewHistogramVec( + m.serverStreamRecvHistogramOpts, + []string{"grpc_type", "grpc_service", "grpc_method"}, + ) + } + + m.serverStreamRecvHistogramEnabled = true +} + +// EnableServerStreamSendTimeHistogram turns on recording of single message send time of streaming RPCs. +// Histogram metrics can be very expensive for Prometheus to retain and query. +func (m *ServerMetrics) EnableServerStreamSendTimeHistogram(opts ...HistogramOption) { + for _, o := range opts { + o(&m.serverStreamSendHistogramOpts) + } + + if !m.serverStreamSendHistogramEnabled { + m.serverStreamSendHistogram = prom.NewHistogramVec( + m.serverStreamSendHistogramOpts, + []string{"grpc_type", "grpc_service", "grpc_method"}, + ) + } + + m.serverStreamSendHistogramEnabled = true +} + // Describe sends the super-set of all possible descriptors of metrics // collected by this Collector to the provided channel and returns once // the last descriptor has been sent. @@ -85,6 +142,12 @@ func (m *ServerMetrics) Describe(ch chan<- *prom.Desc) { if m.serverHandledHistogramEnabled { m.serverHandledHistogram.Describe(ch) } + if m.serverStreamRecvHistogramEnabled { + m.serverStreamRecvHistogram.Describe(ch) + } + if m.serverStreamSendHistogramEnabled { + m.serverStreamSendHistogram.Describe(ch) + } } // Collect is called by the Prometheus registry when collecting @@ -98,6 +161,12 @@ func (m *ServerMetrics) Collect(ch chan<- prom.Metric) { if m.serverHandledHistogramEnabled { m.serverHandledHistogram.Collect(ch) } + if m.serverStreamRecvHistogramEnabled { + m.serverStreamRecvHistogram.Collect(ch) + } + if m.serverStreamSendHistogramEnabled { + m.serverStreamSendHistogram.Collect(ch) + } } // UnaryServerInterceptor is a gRPC server-side interceptor that provides Prometheus monitoring for Unary RPCs. @@ -154,7 +223,9 @@ type monitoredServerStream struct { } func (s *monitoredServerStream) SendMsg(m interface{}) error { + timer := s.monitor.SendMessageTimer() err := s.ServerStream.SendMsg(m) + timer.ObserveDuration() if err == nil { s.monitor.SentMessage() } @@ -162,7 +233,9 @@ func (s *monitoredServerStream) SendMsg(m interface{}) error { } func (s *monitoredServerStream) RecvMsg(m interface{}) error { + timer := s.monitor.ReceiveMessageTimer() err := s.ServerStream.RecvMsg(m) + timer.ObserveDuration() if err == nil { s.monitor.ReceivedMessage() } diff --git a/server_reporter.go b/server_reporter.go index aa9db54..2cf71e5 100644 --- a/server_reporter.go +++ b/server_reporter.go @@ -6,6 +6,7 @@ package grpc_prometheus import ( "time" + "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc/codes" ) @@ -30,10 +31,28 @@ func newServerReporter(m *ServerMetrics, rpcType grpcType, fullMethod string) *s return r } +func (r *serverReporter) ReceiveMessageTimer() timer { + if r.metrics.serverStreamRecvHistogramEnabled { + hist := r.metrics.serverStreamRecvHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName) + return prometheus.NewTimer(hist) + } + + return emptyTimer +} + func (r *serverReporter) ReceivedMessage() { r.metrics.serverStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() } +func (r *serverReporter) SendMessageTimer() timer { + if r.metrics.serverStreamSendHistogramEnabled { + hist := r.metrics.serverStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName) + return prometheus.NewTimer(hist) + } + + return emptyTimer +} + func (r *serverReporter) SentMessage() { r.metrics.serverStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc() }