Skip to content

Commit

Permalink
add metrics.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao committed Sep 3, 2024
1 parent f4d9da7 commit d33e9a2
Show file tree
Hide file tree
Showing 8 changed files with 539 additions and 50 deletions.
5 changes: 5 additions & 0 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: config.MaxConnectionAge,
}))
// add metrics interceptor
grpcServerOptions = append(grpcServerOptions,
grpc.UnaryInterceptor(newMetricsUnaryInterceptor()),
grpc.StreamInterceptor(newMetricsStreamInterceptor()),
)

if config.EnableTLS {
// Check tls cert and key path path
Expand Down
246 changes: 246 additions & 0 deletions cmd/maestro/server/metrics_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package server

import (
"context"
"fmt"
"strings"
"time"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1"
grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
)

func init() {
// Register the metrics:
RegisterGRPCMetrics()
}

// NewMetricsUnaryInterceptor creates a unary server interceptor for server metrics.
// Currently supports the Publish method with PublishRequest.
func newMetricsUnaryInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// extract the type from the method name
methodInfo := strings.Split(info.FullMethod, "/")
if len(methodInfo) != 3 || methodInfo[2] != "Publish" {
return nil, fmt.Errorf("invalid method name: %s", info.FullMethod)
}
t := methodInfo[2]
pubReq, ok := req.(*pbv1.PublishRequest)
if !ok {
return nil, fmt.Errorf("invalid request type for Publish method")
}
// convert the request to cloudevent and extract the source
evt, err := binding.ToEvent(ctx, grpcprotocol.NewMessage(pubReq.Event))
if err != nil {
return nil, fmt.Errorf("failed to convert to cloudevent: %v", err)
}
source := evt.Source()
grpcStartCountMetric.WithLabelValues(t, source).Inc()

grpcMsgReceivedCountMetric.WithLabelValues(t, source).Inc()
startTime := time.Now()
resp, err := handler(ctx, req)
duration := time.Since(startTime).Seconds()
grpcMsgSentCountMetric.WithLabelValues(t, source).Inc()

// get status code from error
status := statusFromError(err)
code := status.Code()
grpcHandledCountMetric.WithLabelValues(t, source, code.String()).Inc()
grpcHandledDurationMetric.WithLabelValues(t, source).Observe(duration)

return resp, err
}
}

// wrappedStream wraps a grpc.ServerStream, capturing the request source
// emitting metrics for the stream interceptor.
type wrappedStream struct {
t string
source *string
grpc.ServerStream
ctx context.Context
}

// RecvMsg wraps the RecvMsg method of the embedded grpc.ServerStream.
// It captures the source from the SubscriptionRequest and emits metrics.
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
subReq, ok := m.(*pbv1.SubscriptionRequest)
if !ok {
return fmt.Errorf("invalid request type for Subscribe method")
}
*w.source = subReq.Source
grpcStartCountMetric.WithLabelValues(w.t, subReq.Source).Inc()
grpcMsgReceivedCountMetric.WithLabelValues(w.t, subReq.Source).Inc()

return err
}

// SendMsg wraps the SendMsg method of the embedded grpc.ServerStream.
func (w *wrappedStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
grpcMsgSentCountMetric.WithLabelValues(w.t, *w.source).Inc()
return err
}

// newWrappedStream creates a wrappedStream with the specified type and source reference.
func newWrappedStream(t string, source *string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{t, source, ss, ctx}
}

// newMetricsStreamInterceptor creates a stream server interceptor for server metrics.
// Currently supports the Subscribe method with SubscriptionRequest.
func newMetricsStreamInterceptor() grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// extract the type from the method name
if !info.IsServerStream || info.IsClientStream {
return fmt.Errorf("invalid stream type for stream method: %s", info.FullMethod)
}
methodInfo := strings.Split(info.FullMethod, "/")
if len(methodInfo) != 3 || methodInfo[2] != "Subscribe" {
return fmt.Errorf("invalid method name for stream method: %s", info.FullMethod)
}
t := methodInfo[2]
source := ""

// create a wrapped stream to capture the source and emit metrics
wrappedStream := newWrappedStream(t, &source, stream.Context(), stream)
startTime := time.Now()
err := handler(srv, wrappedStream)
duration := time.Since(startTime).Seconds()

// get status code from error
status := statusFromError(err)
code := status.Code()
grpcHandledCountMetric.WithLabelValues(t, source, code.String()).Inc()
grpcHandledDurationMetric.WithLabelValues(t, source).Observe(duration)

return err
}
}

// statusFromError returns a grpc status. If the error code is neither a valid grpc status
// nor a context error, codes.Unknown will be set.
func statusFromError(err error) *status.Status {
s, ok := status.FromError(err)
// Mirror what the grpc server itself does, i.e. also convert context errors to status
if !ok {
s = status.FromContextError(err)
}
return s
}

// Subsystem used to define the metrics:
const grpcMetricsSubsystem = "maestro_grpc_server"

// Names of the labels added to metrics:
const (
grpcMetricsTypeLabel = "type"
grpcMetricsSourceLabel = "source"
grpcMetricsCodeLabel = "code"
)

// grpcMetricsLabels - Array of labels added to metrics:
var grpcMetricsLabels = []string{
grpcMetricsTypeLabel,
grpcMetricsSourceLabel,
}

// grpcMetricsAllLabels - Array of all labels added to metrics:
var grpcMetricsAllLabels = []string{
grpcMetricsTypeLabel,
grpcMetricsSourceLabel,
grpcMetricsCodeLabel,
}

// Names of the metrics:
const (
startCountMetric = "started_total"
handledCountMetric = "handled_total"
handledDurationMetric = "handled_duration_seconds"
msgReceivedCountMetric = "msg_received_total"
msgSentCountMetric = "msg_sent_total"
)

// Register the metrics:
func RegisterGRPCMetrics() {
prometheus.MustRegister(grpcStartCountMetric)
prometheus.MustRegister(grpcHandledCountMetric)
prometheus.MustRegister(grpcHandledDurationMetric)
prometheus.MustRegister(grpcMsgReceivedCountMetric)
prometheus.MustRegister(grpcMsgSentCountMetric)
}

// Unregister the metrics:
func UnregisterGRPCMetrics() {
prometheus.Unregister(grpcStartCountMetric)
prometheus.Unregister(grpcHandledCountMetric)
prometheus.Unregister(grpcHandledDurationMetric)
prometheus.Unregister(grpcMsgReceivedCountMetric)
prometheus.Unregister(grpcMsgSentCountMetric)
}

// Reset the metrics:
func ResetGRPCMetrics() {
grpcStartCountMetric.Reset()
grpcHandledCountMetric.Reset()
grpcHandledDurationMetric.Reset()
grpcMsgReceivedCountMetric.Reset()
grpcMsgSentCountMetric.Reset()
}

// Description of the gRPC start count metric:
var grpcStartCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: startCountMetric,
Help: "Total number of RPCs started on the server.",
},
grpcMetricsLabels,
)

// Description of the gRPC handled count metric:
var grpcHandledCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: handledCountMetric,
Help: "Total number of RPCs completed on the server, regardless of success or failure.",
},
grpcMetricsAllLabels,
)

// Description of the gRPC handled duration metric:
var grpcHandledDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: grpcMetricsSubsystem,
Name: handledDurationMetric,
Help: "Histogram of response latency (seconds) by the server.",
Buckets: prometheus.DefBuckets,
},
grpcMetricsLabels,
)

// Description of the gRPC message received count metric:
var grpcMsgReceivedCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: msgReceivedCountMetric,
Help: "Total number of cloudevents received on the server.",
},
grpcMetricsLabels,
)

// Description of the gRPC message sent count metric:
var grpcMsgSentCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: grpcMetricsSubsystem,
Name: msgSentCountMetric,
Help: "Total number of cloudevents sent by the server.",
},
grpcMetricsLabels,
)
50 changes: 22 additions & 28 deletions cmd/maestro/server/metrics_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ import (
"github.com/prometheus/client_golang/prometheus"
)

func init() {
// Register the metrics:
prometheus.MustRegister(requestCountMetric)
prometheus.MustRegister(requestDurationMetric)
}

// MetricsMiddleware creates a new handler that collects metrics for the requests processed by the
// given handler.
func MetricsMiddleware(handler http.Handler) http.Handler {
Expand Down Expand Up @@ -86,9 +92,9 @@ func MetricsMiddleware(handler http.Handler) http.Handler {

// Create the set of labels that we will add to all the requests:
labels := prometheus.Labels{
metricsMethodLabel: r.Method,
metricsPathLabel: path,
metricsCodeLabel: strconv.Itoa(wrapper.code),
restMetricsMethodLabel: r.Method,
restMetricsPathLabel: path,
restMetricsCodeLabel: strconv.Itoa(wrapper.code),
}

// Update the metric containing the number of requests:
Expand All @@ -112,20 +118,20 @@ var metricsPathVarRE = regexp.MustCompile(`{[^}]*}`)
var PathVarSub = "-"

// Subsystem used to define the metrics:
const metricsSubsystem = "api_inbound"
const restMetricsSubsystem = "rest_api_inbound"

// Names of the labels added to metrics:
const (
metricsMethodLabel = "method"
metricsPathLabel = "path"
metricsCodeLabel = "code"
restMetricsMethodLabel = "method"
restMetricsPathLabel = "path"
restMetricsCodeLabel = "code"
)

// MetricsLabels - Array of labels added to metrics:
var MetricsLabels = []string{
metricsMethodLabel,
metricsPathLabel,
metricsCodeLabel,
// restMetricsLabels - Array of labels added to metrics:
var restMetricsLabels = []string{
restMetricsMethodLabel,
restMetricsPathLabel,
restMetricsCodeLabel,
}

// Names of the metrics:
Expand All @@ -134,26 +140,20 @@ const (
requestDuration = "request_duration"
)

// MetricsNames - Array of Names of the metrics:
var MetricsNames = []string{
requestCount,
requestDuration,
}

// Description of the requests count metric:
var requestCountMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: metricsSubsystem,
Subsystem: restMetricsSubsystem,
Name: requestCount,
Help: "Number of requests served.",
},
MetricsLabels,
restMetricsLabels,
)

// Description of the request duration metric:
var requestDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: metricsSubsystem,
Subsystem: restMetricsSubsystem,
Name: requestDuration,
Help: "Request duration in seconds.",
Buckets: []float64{
Expand All @@ -163,7 +163,7 @@ var requestDurationMetric = prometheus.NewHistogramVec(
30.0,
},
},
MetricsLabels,
restMetricsLabels,
)

// metricsResponseWrapper is an extension of the HTTP response writer that remembers the status code,
Expand All @@ -189,9 +189,3 @@ func (w *metricsResponseWrapper) WriteHeader(code int) {
w.code = code
w.wrapped.WriteHeader(code)
}

func init() {
// Register the metrics:
prometheus.MustRegister(requestCountMetric)
prometheus.MustRegister(requestDurationMetric)
}
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ require (
github.com/openshift-online/ocm-common v0.0.0-20240620110211-2ecfa6ec5707
github.com/openshift-online/ocm-sdk-go v0.1.421
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/prometheus/common v0.45.0
github.com/segmentio/ksuid v1.0.2
github.com/spf13/cobra v1.8.0
github.com/spf13/pflag v1.0.5
Expand Down Expand Up @@ -112,8 +114,6 @@ require (
github.com/openshift/library-go v0.0.0-20240621150525-4bb4238aef81 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pkg/profile v1.3.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/db/advisory_locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type (

const (
Migrations LockType = "migrations"
Resources LockType = "Resources"
ResourceStatus LockType = "ResourceStatus"
Resources LockType = "resources"
ResourceStatus LockType = "resource_status"
Events LockType = "events"
Instances LockType = "instances"
)
Expand Down
Loading

0 comments on commit d33e9a2

Please sign in to comment.