Skip to content
This repository has been archived by the owner on Apr 18, 2023. It is now read-only.

Use stats handler instead of interceptor and add message size histograms #88

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ var (

// StreamClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Streaming RPCs.
StreamClientInterceptor = DefaultClientMetrics.StreamClientInterceptor()

// ClientStatsHandler is a gRPC client-side stats.Handler that provides Prometheus monitoring for RPCs.
ClientStatsHandler = DefaultClientMetrics.NewClientStatsHandler()
)

func init() {
Expand Down Expand Up @@ -55,3 +58,21 @@ func EnableClientStreamSendTimeHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableClientStreamSendTimeHistogram(opts...)
prom.Register(DefaultClientMetrics.clientStreamSendHistogram)
}

// EnableClientMsgSizeReceivedBytesHistogram turns on recording of
// single message send time of streaming RPCs.
// This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableMsgSizeReceivedBytesHistogram(opts...)
prom.Register(DefaultClientMetrics.clientMsgSizeReceivedHistogram)
}

// EnableClientMsgSizeSentBytesHistogram turns on recording of
// single message send time of streaming RPCs.
// This function acts on the DefaultClientMetrics variable and the
// default Prometheus metrics registry.
func EnableClientMsgSizeSentBytesHistogram(opts ...HistogramOption) {
DefaultClientMetrics.EnableMsgSizeSentBytesHistogram(opts...)
prom.Register(DefaultClientMetrics.clientMsgSizeSentHistogram)
}
85 changes: 80 additions & 5 deletions client_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

// ClientMetrics represents a collection of metrics to be registered on a
// Prometheus metrics registry for a gRPC client.
type ClientMetrics struct {
clientStartedCounter *prom.CounterVec
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec
clientStartedCounter *prom.CounterVec
clientStartedCounterOpts prom.CounterOpts
clientHandledCounter *prom.CounterVec
clientStreamMsgReceived *prom.CounterVec
clientStreamMsgSent *prom.CounterVec

clientHandledHistogramEnabled bool
clientHandledHistogramOpts prom.HistogramOpts
Expand All @@ -29,6 +31,14 @@ type ClientMetrics struct {
clientStreamSendHistogramEnabled bool
clientStreamSendHistogramOpts prom.HistogramOpts
clientStreamSendHistogram *prom.HistogramVec

clientMsgSizeReceivedHistogramEnabled bool
clientMsgSizeReceivedHistogramOpts prom.HistogramOpts
clientMsgSizeReceivedHistogram *prom.HistogramVec

clientMsgSizeSentHistogramEnabled bool
clientMsgSizeSentHistogramOpts prom.HistogramOpts
clientMsgSizeSentHistogram *prom.HistogramVec
}

// NewClientMetrics returns a ClientMetrics object. Use a new instance of
Expand Down Expand Up @@ -82,7 +92,21 @@ func NewClientMetrics(counterOpts ...CounterOption) *ClientMetrics {
Help: "Histogram of response latency (seconds) of the gRPC single message send.",
Buckets: prom.DefBuckets,
},
clientStreamSendHistogram: nil,
clientStreamSendHistogram: nil,
clientMsgSizeReceivedHistogramEnabled: false,
clientMsgSizeReceivedHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_size_received_bytes",
Help: "Histogram of message sizes received by the client.",
Buckets: defMsgBytesBuckets,
},
clientMsgSizeReceivedHistogram: nil,
clientMsgSizeSentHistogramEnabled: false,
clientMsgSizeSentHistogramOpts: prom.HistogramOpts{
Name: "grpc_client_msg_size_sent_bytes",
Help: "Histogram of message sizes sent by the client.",
Buckets: defMsgBytesBuckets,
},
clientMsgSizeSentHistogram: nil,
}
}

Expand All @@ -103,6 +127,12 @@ func (m *ClientMetrics) Describe(ch chan<- *prom.Desc) {
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Describe(ch)
}
if m.clientMsgSizeReceivedHistogramEnabled {
m.clientMsgSizeReceivedHistogram.Describe(ch)
}
if m.clientMsgSizeSentHistogramEnabled {
m.clientMsgSizeSentHistogram.Describe(ch)
}
}

// Collect is called by the Prometheus registry when collecting
Expand All @@ -122,6 +152,12 @@ func (m *ClientMetrics) Collect(ch chan<- prom.Metric) {
if m.clientStreamSendHistogramEnabled {
m.clientStreamSendHistogram.Collect(ch)
}
if m.clientMsgSizeReceivedHistogramEnabled {
m.clientMsgSizeReceivedHistogram.Collect(ch)
}
if m.clientMsgSizeSentHistogramEnabled {
m.clientMsgSizeSentHistogram.Collect(ch)
}
}

// EnableClientHandlingTimeHistogram turns on recording of handling time of RPCs.
Expand Down Expand Up @@ -173,6 +209,38 @@ func (m *ClientMetrics) EnableClientStreamSendTimeHistogram(opts ...HistogramOpt
m.clientStreamSendHistogramEnabled = true
}

// EnableMsgSizeReceivedBytesHistogram turns on recording of received message size of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query. It takes
// options to configure histogram options such as the defined buckets.
func (m *ClientMetrics) EnableMsgSizeReceivedBytesHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientMsgSizeReceivedHistogramOpts)
}
if !m.clientMsgSizeReceivedHistogramEnabled {
m.clientMsgSizeReceivedHistogram = prom.NewHistogramVec(
m.clientMsgSizeReceivedHistogramOpts,
[]string{"grpc_service", "grpc_method", "grpc_stats"},
)
}
m.clientMsgSizeReceivedHistogramEnabled = true
}

// EnableMsgSizeSentBytesHistogram turns on recording of sent message size of RPCs.
// Histogram metrics can be very expensive for Prometheus to retain and query. It
// takes options to configure histogram options such as the defined buckets.
func (m *ClientMetrics) EnableMsgSizeSentBytesHistogram(opts ...HistogramOption) {
for _, o := range opts {
o(&m.clientMsgSizeSentHistogramOpts)
}
if !m.clientMsgSizeSentHistogramEnabled {
m.clientMsgSizeSentHistogram = prom.NewHistogramVec(
m.clientMsgSizeSentHistogramOpts,
[]string{"grpc_service", "grpc_method", "grpc_stats"},
)
}
m.clientMsgSizeSentHistogramEnabled = true
}

// UnaryClientInterceptor is a gRPC client-side interceptor that provides Prometheus monitoring for Unary RPCs.
func (m *ClientMetrics) UnaryClientInterceptor() func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
Expand Down Expand Up @@ -202,6 +270,13 @@ func (m *ClientMetrics) StreamClientInterceptor() func(ctx context.Context, desc
}
}

// NewClientStatsHandler is a gRPC client-side stats.Handler that providers Prometheus monitoring for RPCs.
func (m *ClientMetrics) NewClientStatsHandler() stats.Handler {
return &clientStatsHandler{
clientMetrics: m,
}
}

func clientStreamType(desc *grpc.StreamDesc) grpcType {
if desc.ClientStreams && !desc.ServerStreams {
return ClientStream
Expand Down
38 changes: 38 additions & 0 deletions client_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package grpc_prometheus

import (
"fmt"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -31,6 +32,16 @@ func newClientReporter(m *ClientMetrics, rpcType grpcType, fullMethod string) *c
return r
}

func newClientReporterForStatsHanlder(startTime time.Time, m *ClientMetrics, fullMethod string) *clientReporter {
r := &clientReporter{
metrics: m,
rpcType: Unary,
startTime: startTime,
}
r.serviceName, r.methodName = splitMethodName(fullMethod)
return r
}

// timer is a helper interface to time functions.
type timer interface {
ObserveDuration() time.Duration
Expand All @@ -54,10 +65,25 @@ func (r *clientReporter) ReceiveMessageTimer() timer {
return emptyTimer
}

func (r *clientReporter) StartedConn() {
r.metrics.clientStartedCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

func (r *clientReporter) ReceivedMessage() {
r.metrics.clientStreamMsgReceived.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

// ReceivedMessageSize counts the size of received messages on client-side
func (r *clientReporter) ReceivedMessageSize(rpcStats grpcStats, size float64) {
if rpcStats == Payload {
r.ReceivedMessage()
}

if r.metrics.clientMsgSizeReceivedHistogramEnabled {
r.metrics.clientMsgSizeReceivedHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
}
}

func (r *clientReporter) SendMessageTimer() timer {
if r.metrics.clientStreamSendHistogramEnabled {
hist := r.metrics.clientStreamSendHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName)
Expand All @@ -71,9 +97,21 @@ func (r *clientReporter) SentMessage() {
r.metrics.clientStreamMsgSent.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Inc()
}

// SentMessageSize counts the size of sent messages on client-side
func (r *clientReporter) SentMessageSize(rpcStats grpcStats, size float64) {
if rpcStats == Payload {
r.SentMessage()
}

if r.metrics.clientMsgSizeSentHistogramEnabled {
r.metrics.clientMsgSizeSentHistogram.WithLabelValues(r.serviceName, r.methodName, string(rpcStats)).Observe(size)
}
}

func (r *clientReporter) Handled(code codes.Code) {
r.metrics.clientHandledCounter.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName, code.String()).Inc()
if r.metrics.clientHandledHistogramEnabled {
fmt.Printf("client handled count + 1: %v,%f\n", code, time.Since(r.startTime).Seconds())
r.metrics.clientHandledHistogram.WithLabelValues(string(r.rpcType), r.serviceName, r.methodName).Observe(time.Since(r.startTime).Seconds())
}
}
58 changes: 58 additions & 0 deletions client_stats_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package grpc_prometheus

import (
"context"

"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)

type clientStatsHandler struct {
clientMetrics *ClientMetrics
}

// TagRPC implements the stats.Hanlder interface.
func (h *clientStatsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
rpcInfo := newRPCInfo(info.FullMethodName)
return context.WithValue(ctx, &rpcInfoKey, rpcInfo)
}

// HandleRPC implements the stats.Hanlder interface.
func (h *clientStatsHandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
v, ok := ctx.Value(&rpcInfoKey).(*rpcInfo)
if !ok {
return
}
monitor := newClientReporterForStatsHanlder(v.startTime, h.clientMetrics, v.fullMethodName)
switch s := s.(type) {
case *stats.Begin:
v.startTime = s.BeginTime
monitor.StartedConn()
case *stats.End:
monitor.Handled(status.Code(s.Error))
case *stats.InHeader:
monitor.ReceivedMessageSize(Header, float64(s.WireLength))
case *stats.InPayload:
// TODO: remove the +5 offset on wire length here, which is a temporary stand-in for the missing grpc framing offset
// See: https://github.com/grpc/grpc-go/issues/1647
monitor.ReceivedMessageSize(Payload, float64(s.WireLength+5))
case *stats.InTrailer:
monitor.ReceivedMessageSize(Tailer, float64(s.WireLength))
case *stats.OutHeader:
// TODO: Add the sent header message size stats, if the wire length of the send header is provided
case *stats.OutPayload:
// TODO(tonywang): response latency (seconds) of the gRPC single message send
monitor.SentMessageSize(Payload, float64(s.WireLength))
case *stats.OutTrailer:
monitor.SentMessageSize(Tailer, float64(s.WireLength))
}
}

// TagConn implements the stats.Hanlder interface.
func (h *clientStatsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
}

// HandleConn implements the stats.Hanlder interface.
func (h *clientStatsHandler) HandleConn(ctx context.Context, s stats.ConnStats) {
}
Loading