Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metrics. #186

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions cmd/maestro/server/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,20 +119,22 @@ func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAut
}
}

// wrappedStream wraps a grpc.ServerStream associated with an incoming RPC, and
// wrappedAuthStream wraps a grpc.ServerStream associated with an incoming RPC, and
// a custom context containing the user and groups derived from the client certificate
// specified in the incoming RPC metadata
type wrappedStream struct {
type wrappedAuthStream struct {
grpc.ServerStream
ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
// Context returns the context associated with the stream
func (w *wrappedAuthStream) Context() context.Context {
return w.ctx
}

func newWrappedStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s, ctx}
// newWrappedAuthStream creates a new wrappedAuthStream
func newWrappedAuthStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream {
return &wrappedAuthStream{s, ctx}
}

// newAuthStreamInterceptor creates a stream interceptor that retrieves the user and groups
Expand Down Expand Up @@ -167,6 +169,6 @@ func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAu
return fmt.Errorf("unsupported authentication Type %s", authNType)
}

return handler(srv, newWrappedStream(newContextWithIdentity(ss.Context(), user, groups), ss))
return handler(srv, newWrappedAuthStream(newContextWithIdentity(ss.Context(), user, groups), ss))
}
}
17 changes: 11 additions & 6 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
MaxVersion: tls.VersionTLS13,
}

// add metrics and auth interceptors
grpcServerOptions = append(grpcServerOptions,
grpc.ChainUnaryInterceptor(newMetricsUnaryInterceptor(), newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.ChainStreamInterceptor(newMetricsStreamInterceptor(), newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))

if config.GRPCAuthNType == "mtls" {
if len(config.ClientCAFile) == 0 {
check(fmt.Errorf("no client CA file specified when using mtls authorization type"), "Can't start gRPC server")
Expand All @@ -102,17 +107,17 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
tlsConfig.ClientCAs = certPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert

grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
glog.Infof("Serving gRPC service with mTLS at %s", config.ServerBindPort)
} else {
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)),
grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)),
grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer)))
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
glog.Infof("Serving gRPC service with TLS at %s", config.ServerBindPort)
}
} else {
// append metrics interceptor
grpcServerOptions = append(grpcServerOptions,
grpc.UnaryInterceptor(newMetricsUnaryInterceptor()),
grpc.StreamInterceptor(newMetricsStreamInterceptor()))
// Note: Do not use this in production.
glog.Infof("Serving gRPC service without TLS at %s", config.ServerBindPort)
}
Expand Down
242 changes: 242 additions & 0 deletions cmd/maestro/server/metrics_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package server

import (
"context"
"fmt"
"strings"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
)

func init() {
// Register the metrics:
RegisterGRPCMetrics()
}

// NewMetricsUnaryInterceptor creates a unary server interceptor for server metrics.
// Currently supports the Publish method with PublishRequest.
func newMetricsUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// extract the type from the method name
methodInfo := strings.Split(info.FullMethod, "/")
if len(methodInfo) != 3 || methodInfo[2] != "Publish" {
return nil, fmt.Errorf("invalid method name: %s", info.FullMethod)
}
t := methodInfo[2]
pubReq, ok := req.(*pbv1.PublishRequest)
if !ok {
return nil, fmt.Errorf("invalid request type for Publish method")
}
// convert the request to cloudevent and extract the source
evt, err := binding.ToEvent(ctx, grpcprotocol.NewMessage(pubReq.Event))
if err != nil {
return nil, fmt.Errorf("failed to convert to cloudevent: %v", err)
}
source := evt.Source()
grpcCalledCountMetric.WithLabelValues(t, source).Inc()

grpcMessageReceivedCountMetric.WithLabelValues(t, source).Inc()
startTime := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(startTime).Seconds()
grpcMessageSentCountMetric.WithLabelValues(t, source).Inc()

// get status code from error
status := statusFromError(err)
code := status.Code()
grpcProcessedCountMetric.WithLabelValues(t, source, code.String()).Inc()
grpcProcessedDurationMetric.WithLabelValues(t, source).Observe(duration)

return resp, err
}
}

// wrappedMetricsStream wraps a grpc.ServerStream, capturing the request source
// emitting metrics for the stream interceptor.
type wrappedMetricsStream struct {
t string
source *string
grpc.ServerStream
ctx context.Context
}

// RecvMsg wraps the RecvMsg method of the embedded grpc.ServerStream.
// It captures the source from the SubscriptionRequest and emits metrics.
func (w *wrappedMetricsStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
subReq, ok := m.(*pbv1.SubscriptionRequest)
if !ok {
return fmt.Errorf("invalid request type for Subscribe method")
}
*w.source = subReq.Source
grpcCalledCountMetric.WithLabelValues(w.t, subReq.Source).Inc()
grpcMessageReceivedCountMetric.WithLabelValues(w.t, subReq.Source).Inc()

return err
}

// SendMsg wraps the SendMsg method of the embedded grpc.ServerStream.
func (w *wrappedMetricsStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
grpcMessageSentCountMetric.WithLabelValues(w.t, *w.source).Inc()
return err
}

// newWrappedMetricsStream creates a wrappedMetricsStream with the specified type and source reference.
func newWrappedMetricsStream(t string, source *string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
return &wrappedMetricsStream{t, source, ss, ctx}
}

// newMetricsStreamInterceptor creates a stream server interceptor for server metrics.
// Currently supports the Subscribe method with SubscriptionRequest.
func newMetricsStreamInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// extract the type from the method name
if !info.IsServerStream || info.IsClientStream {
return fmt.Errorf("invalid stream type for stream method: %s", info.FullMethod)
}
methodInfo := strings.Split(info.FullMethod, "/")
if len(methodInfo) != 3 || methodInfo[2] != "Subscribe" {
return fmt.Errorf("invalid method name for stream method: %s", info.FullMethod)
}
t := methodInfo[2]
source := ""
// create a wrapped stream to capture the source and emit metrics
wrappedMetricsStream := newWrappedMetricsStream(t, &source, stream.Context(), stream)
err := handler(srv, wrappedMetricsStream)

// get status code from error
status := statusFromError(err)
code := status.Code()
grpcProcessedCountMetric.WithLabelValues(t, source, code.String()).Inc()

return err
}
}

// statusFromError returns a grpc status. If the error code is neither a valid grpc status
// nor a context error, codes.Unknown will be set.
func statusFromError(err error) *status.Status {
s, ok := status.FromError(err)
// Mirror what the grpc server itself does, i.e. also convert context errors to status
if !ok {
s = status.FromContextError(err)
}
return s
}

// Subsystem used to define the metrics:
const grpcMetricsSubsystem = "grpc_server"

// Names of the labels added to metrics:
const (
grpcMetricsTypeLabel = "type"
grpcMetricsSourceLabel = "source"
grpcMetricsCodeLabel = "code"
)

// grpcMetricsLabels - Array of labels added to metrics:
var grpcMetricsLabels = []string{
grpcMetricsTypeLabel,
grpcMetricsSourceLabel,
}

// grpcMetricsAllLabels - Array of all labels added to metrics:
var grpcMetricsAllLabels = []string{
grpcMetricsTypeLabel,
grpcMetricsSourceLabel,
grpcMetricsCodeLabel,
}

// Names of the metrics:
const (
calledCountMetric = "called_total"
processedCountMetric = "processed_total"
processedDurationMetric = "processed_duration_seconds"
messageReceivedCountMetric = "message_received_total"
messageSentCountMetric = "message_sent_total"
)

// Register the metrics:
func RegisterGRPCMetrics() {
prometheus.MustRegister(grpcCalledCountMetric)
prometheus.MustRegister(grpcProcessedCountMetric)
prometheus.MustRegister(grpcProcessedDurationMetric)
prometheus.MustRegister(grpcMessageReceivedCountMetric)
prometheus.MustRegister(grpcMessageSentCountMetric)
}

// Unregister the metrics:
func UnregisterGRPCMetrics() {
prometheus.Unregister(grpcCalledCountMetric)
prometheus.Unregister(grpcProcessedCountMetric)
prometheus.Unregister(grpcProcessedDurationMetric)
prometheus.Unregister(grpcMessageReceivedCountMetric)
prometheus.Unregister(grpcMessageSentCountMetric)
}

// Reset the metrics:
func ResetGRPCMetrics() {
grpcCalledCountMetric.Reset()
grpcProcessedCountMetric.Reset()
grpcProcessedDurationMetric.Reset()
grpcMessageReceivedCountMetric.Reset()
grpcMessageSentCountMetric.Reset()
}

// Description of the gRPC called count metric:
var grpcCalledCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: calledCountMetric,
Help: "Total number of RPCs called on the server.",
},
grpcMetricsLabels,
)

// Description of the gRPC processed count metric:
var grpcProcessedCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: processedCountMetric,
Help: "Total number of RPCs processed on the server, regardless of success or failure.",
},
grpcMetricsAllLabels,
)

// Description of the gRPC processed duration metric:
var grpcProcessedDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: grpcMetricsSubsystem,
Name: processedDurationMetric,
Help: "Histogram of the duration of RPCs processed on the server.",
Buckets: prometheus.DefBuckets,
},
grpcMetricsLabels,
)

// Description of the gRPC message received count metric:
var grpcMessageReceivedCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: messageReceivedCountMetric,
Help: "Total number of messages received on the server from agent and client.",
},
grpcMetricsLabels,
)

// Description of the gRPC message sent count metric:
var grpcMessageSentCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: messageSentCountMetric,
Help: "Total number of messages sent by the server to agent and client.",
},
grpcMetricsLabels,
)
Loading