diff --git a/cmd/maestro/server/auth_interceptor.go b/cmd/maestro/server/auth_interceptor.go index 892f9e05..1faef6c5 100644 --- a/cmd/maestro/server/auth_interceptor.go +++ b/cmd/maestro/server/auth_interceptor.go @@ -119,20 +119,22 @@ func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAut } } -// wrappedStream wraps a grpc.ServerStream associated with an incoming RPC, and +// wrappedAuthStream wraps a grpc.ServerStream associated with an incoming RPC, and // a custom context containing the user and groups derived from the client certificate // specified in the incoming RPC metadata -type wrappedStream struct { +type wrappedAuthStream struct { grpc.ServerStream ctx context.Context } -func (w *wrappedStream) Context() context.Context { +// Context returns the context associated with the stream +func (w *wrappedAuthStream) Context() context.Context { return w.ctx } -func newWrappedStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream { - return &wrappedStream{s, ctx} +// newWrappedAuthStream creates a new wrappedAuthStream +func newWrappedAuthStream(ctx context.Context, s grpc.ServerStream) grpc.ServerStream { + return &wrappedAuthStream{s, ctx} } // newAuthStreamInterceptor creates a stream interceptor that retrieves the user and groups @@ -167,6 +169,6 @@ func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAu return fmt.Errorf("unsupported authentication Type %s", authNType) } - return handler(srv, newWrappedStream(newContextWithIdentity(ss.Context(), user, groups), ss)) + return handler(srv, newWrappedAuthStream(newContextWithIdentity(ss.Context(), user, groups), ss)) } } diff --git a/cmd/maestro/server/grpc_server.go b/cmd/maestro/server/grpc_server.go index 6b1f7469..3905275d 100644 --- a/cmd/maestro/server/grpc_server.go +++ b/cmd/maestro/server/grpc_server.go @@ -80,6 +80,10 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e MaxVersion: tls.VersionTLS13, } + grpcServerOptions = append(grpcServerOptions, + grpc.ChainUnaryInterceptor(newMetricsUnaryInterceptor(), newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)), + grpc.ChainStreamInterceptor(newMetricsStreamInterceptor(), newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer))) + if config.GRPCAuthNType == "mtls" { if len(config.ClientCAFile) == 0 { check(fmt.Errorf("no client CA file specified when using mtls authorization type"), "Can't start gRPC server") @@ -102,14 +106,10 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e tlsConfig.ClientCAs = certPool tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert - grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)), - grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)), - grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer))) + grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig))) glog.Infof("Serving gRPC service with mTLS at %s", config.ServerBindPort) } else { - grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)), - grpc.UnaryInterceptor(newAuthUnaryInterceptor(config.GRPCAuthNType, grpcAuthorizer)), - grpc.StreamInterceptor(newAuthStreamInterceptor(config.GRPCAuthNType, grpcAuthorizer))) + grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig))) glog.Infof("Serving gRPC service with TLS at %s", config.ServerBindPort) } } else { diff --git a/cmd/maestro/server/metrics_interceptor.go b/cmd/maestro/server/metrics_interceptor.go new file mode 100644 index 00000000..b84b6582 --- /dev/null +++ b/cmd/maestro/server/metrics_interceptor.go @@ -0,0 +1,243 @@ +package server + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/cloudevents/sdk-go/v2/binding" + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/status" + pbv1 "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1" + grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" +) + +func init() { + // Register the metrics: + RegisterGRPCMetrics() +} + +// NewMetricsUnaryInterceptor creates a unary server interceptor for server metrics. +// Currently supports the Publish method with PublishRequest. +func newMetricsUnaryInterceptor() grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // extract the type from the method name + methodInfo := strings.Split(info.FullMethod, "/") + if len(methodInfo) != 3 || methodInfo[2] != "Publish" { + return nil, fmt.Errorf("invalid method name: %s", info.FullMethod) + } + t := methodInfo[2] + pubReq, ok := req.(*pbv1.PublishRequest) + if !ok { + return nil, fmt.Errorf("invalid request type for Publish method") + } + // convert the request to cloudevent and extract the source + evt, err := binding.ToEvent(ctx, grpcprotocol.NewMessage(pubReq.Event)) + if err != nil { + return nil, fmt.Errorf("failed to convert to cloudevent: %v", err) + } + source := evt.Source() + grpcCalledCountMetric.WithLabelValues(t, source).Inc() + + grpcMessageReceivedCountMetric.WithLabelValues(t, source).Inc() + startTime := time.Now() + resp, err := handler(ctx, req) + duration := time.Since(startTime).Seconds() + grpcMessageSentCountMetric.WithLabelValues(t, source).Inc() + + // get status code from error + status := statusFromError(err) + code := status.Code() + grpcProcessedCountMetric.WithLabelValues(t, source, code.String()).Inc() + grpcProcessedDurationMetric.WithLabelValues(t, source).Observe(duration) + + return resp, err + } +} + +// wrappedMetricsStream wraps a grpc.ServerStream, capturing the request source +// emitting metrics for the stream interceptor. +type wrappedMetricsStream struct { + t string + source *string + grpc.ServerStream + ctx context.Context +} + +// RecvMsg wraps the RecvMsg method of the embedded grpc.ServerStream. +// It captures the source from the SubscriptionRequest and emits metrics. +func (w *wrappedMetricsStream) RecvMsg(m interface{}) error { + err := w.ServerStream.RecvMsg(m) + subReq, ok := m.(*pbv1.SubscriptionRequest) + if !ok { + return fmt.Errorf("invalid request type for Subscribe method") + } + *w.source = subReq.Source + grpcMessageReceivedCountMetric.WithLabelValues(w.t, subReq.Source).Inc() + + return err +} + +// SendMsg wraps the SendMsg method of the embedded grpc.ServerStream. +func (w *wrappedMetricsStream) SendMsg(m interface{}) error { + err := w.ServerStream.SendMsg(m) + grpcMessageSentCountMetric.WithLabelValues(w.t, *w.source).Inc() + return err +} + +// newWrappedMetricsStream creates a wrappedMetricsStream with the specified type and source reference. +func newWrappedMetricsStream(t string, source *string, ctx context.Context, ss grpc.ServerStream) grpc.ServerStream { + return &wrappedMetricsStream{t, source, ss, ctx} +} + +// newMetricsStreamInterceptor creates a stream server interceptor for server metrics. +// Currently supports the Subscribe method with SubscriptionRequest. +func newMetricsStreamInterceptor() grpc.StreamServerInterceptor { + return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + // extract the type from the method name + if !info.IsServerStream || info.IsClientStream { + return fmt.Errorf("invalid stream type for stream method: %s", info.FullMethod) + } + methodInfo := strings.Split(info.FullMethod, "/") + if len(methodInfo) != 3 || methodInfo[2] != "Subscribe" { + return fmt.Errorf("invalid method name for stream method: %s", info.FullMethod) + } + t := methodInfo[2] + source := "" + + // create a wrapped stream to capture the source and emit metrics + wrappedMetricsStream := newWrappedMetricsStream(t, &source, stream.Context(), stream) + err := handler(srv, wrappedMetricsStream) + + // get status code from error + status := statusFromError(err) + code := status.Code() + grpcProcessedCountMetric.WithLabelValues(t, source, code.String()).Inc() + grpcCalledCountMetric.WithLabelValues(t, source).Inc() + + return err + } +} + +// statusFromError returns a grpc status. If the error code is neither a valid grpc status +// nor a context error, codes.Unknown will be set. +func statusFromError(err error) *status.Status { + s, ok := status.FromError(err) + // Mirror what the grpc server itself does, i.e. also convert context errors to status + if !ok { + s = status.FromContextError(err) + } + return s +} + +// Subsystem used to define the metrics: +const grpcMetricsSubsystem = "grpc_server" + +// Names of the labels added to metrics: +const ( + grpcMetricsTypeLabel = "type" + grpcMetricsSourceLabel = "source" + grpcMetricsCodeLabel = "code" +) + +// grpcMetricsLabels - Array of labels added to metrics: +var grpcMetricsLabels = []string{ + grpcMetricsTypeLabel, + grpcMetricsSourceLabel, +} + +// grpcMetricsAllLabels - Array of all labels added to metrics: +var grpcMetricsAllLabels = []string{ + grpcMetricsTypeLabel, + grpcMetricsSourceLabel, + grpcMetricsCodeLabel, +} + +// Names of the metrics: +const ( + calledCountMetric = "called_total" + processedCountMetric = "processed_total" + processedDurationMetric = "processed_duration_seconds" + messageReceivedCountMetric = "message_received_total" + messageSentCountMetric = "message_sent_total" +) + +// Register the metrics: +func RegisterGRPCMetrics() { + prometheus.MustRegister(grpcCalledCountMetric) + prometheus.MustRegister(grpcProcessedCountMetric) + prometheus.MustRegister(grpcProcessedDurationMetric) + prometheus.MustRegister(grpcMessageReceivedCountMetric) + prometheus.MustRegister(grpcMessageSentCountMetric) +} + +// Unregister the metrics: +func UnregisterGRPCMetrics() { + prometheus.Unregister(grpcCalledCountMetric) + prometheus.Unregister(grpcProcessedCountMetric) + prometheus.Unregister(grpcProcessedDurationMetric) + prometheus.Unregister(grpcMessageReceivedCountMetric) + prometheus.Unregister(grpcMessageSentCountMetric) +} + +// Reset the metrics: +func ResetGRPCMetrics() { + grpcCalledCountMetric.Reset() + grpcProcessedCountMetric.Reset() + grpcProcessedDurationMetric.Reset() + grpcMessageReceivedCountMetric.Reset() + grpcMessageSentCountMetric.Reset() +} + +// Description of the gRPC called count metric: +var grpcCalledCountMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: grpcMetricsSubsystem, + Name: calledCountMetric, + Help: "Total number of RPCs called on the server.", + }, + grpcMetricsLabels, +) + +// Description of the gRPC processed count metric: +var grpcProcessedCountMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: grpcMetricsSubsystem, + Name: processedCountMetric, + Help: "Total number of RPCs processed on the server, regardless of success or failure.", + }, + grpcMetricsAllLabels, +) + +// Description of the gRPC processed duration metric: +var grpcProcessedDurationMetric = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Subsystem: grpcMetricsSubsystem, + Name: processedDurationMetric, + Help: "Histogram of the duration of RPCs processed on the server.", + Buckets: prometheus.DefBuckets, + }, + grpcMetricsLabels, +) + +// Description of the gRPC message received count metric: +var grpcMessageReceivedCountMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: grpcMetricsSubsystem, + Name: messageReceivedCountMetric, + Help: "Total number of messages received on the server from agent and client.", + }, + grpcMetricsLabels, +) + +// Description of the gRPC message sent count metric: +var grpcMessageSentCountMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: grpcMetricsSubsystem, + Name: messageSentCountMetric, + Help: "Total number of messages sent by the server to agent and client.", + }, + grpcMetricsLabels, +) diff --git a/cmd/maestro/server/metrics_middleware.go b/cmd/maestro/server/metrics_middleware.go index 448593f1..517a188a 100755 --- a/cmd/maestro/server/metrics_middleware.go +++ b/cmd/maestro/server/metrics_middleware.go @@ -58,6 +58,12 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +func init() { + // Register the metrics: + prometheus.MustRegister(requestCountMetric) + prometheus.MustRegister(requestDurationMetric) +} + // MetricsMiddleware creates a new handler that collects metrics for the requests processed by the // given handler. func MetricsMiddleware(handler http.Handler) http.Handler { @@ -86,9 +92,9 @@ func MetricsMiddleware(handler http.Handler) http.Handler { // Create the set of labels that we will add to all the requests: labels := prometheus.Labels{ - metricsMethodLabel: r.Method, - metricsPathLabel: path, - metricsCodeLabel: strconv.Itoa(wrapper.code), + restMetricsMethodLabel: r.Method, + restMetricsPathLabel: path, + restMetricsCodeLabel: strconv.Itoa(wrapper.code), } // Update the metric containing the number of requests: @@ -112,20 +118,20 @@ var metricsPathVarRE = regexp.MustCompile(`{[^}]*}`) var PathVarSub = "-" // Subsystem used to define the metrics: -const metricsSubsystem = "api_inbound" +const restMetricsSubsystem = "rest_api_inbound" // Names of the labels added to metrics: const ( - metricsMethodLabel = "method" - metricsPathLabel = "path" - metricsCodeLabel = "code" + restMetricsMethodLabel = "method" + restMetricsPathLabel = "path" + restMetricsCodeLabel = "code" ) -// MetricsLabels - Array of labels added to metrics: -var MetricsLabels = []string{ - metricsMethodLabel, - metricsPathLabel, - metricsCodeLabel, +// restMetricsLabels - Array of labels added to metrics: +var restMetricsLabels = []string{ + restMetricsMethodLabel, + restMetricsPathLabel, + restMetricsCodeLabel, } // Names of the metrics: @@ -134,26 +140,20 @@ const ( requestDuration = "request_duration" ) -// MetricsNames - Array of Names of the metrics: -var MetricsNames = []string{ - requestCount, - requestDuration, -} - // Description of the requests count metric: var requestCountMetric = prometheus.NewCounterVec( prometheus.CounterOpts{ - Subsystem: metricsSubsystem, + Subsystem: restMetricsSubsystem, Name: requestCount, Help: "Number of requests served.", }, - MetricsLabels, + restMetricsLabels, ) // Description of the request duration metric: var requestDurationMetric = prometheus.NewHistogramVec( prometheus.HistogramOpts{ - Subsystem: metricsSubsystem, + Subsystem: restMetricsSubsystem, Name: requestDuration, Help: "Request duration in seconds.", Buckets: []float64{ @@ -163,7 +163,7 @@ var requestDurationMetric = prometheus.NewHistogramVec( 30.0, }, }, - MetricsLabels, + restMetricsLabels, ) // metricsResponseWrapper is an extension of the HTTP response writer that remembers the status code, @@ -189,9 +189,3 @@ func (w *metricsResponseWrapper) WriteHeader(code int) { w.code = code w.wrapped.WriteHeader(code) } - -func init() { - // Register the metrics: - prometheus.MustRegister(requestCountMetric) - prometheus.MustRegister(requestDurationMetric) -} diff --git a/go.mod b/go.mod index 1e6b776e..9d646cab 100755 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( github.com/openshift-online/ocm-sdk-go v0.1.421 github.com/openshift/library-go v0.0.0-20240621150525-4bb4238aef81 github.com/prometheus/client_golang v1.18.0 + github.com/prometheus/client_model v0.5.0 + github.com/prometheus/common v0.45.0 github.com/segmentio/ksuid v1.0.2 github.com/spf13/cobra v1.8.0 github.com/spf13/pflag v1.0.5 @@ -115,8 +117,6 @@ require ( github.com/openshift/client-go v0.0.0-20240528061634-b054aa794d87 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pkg/profile v1.3.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect - github.com/prometheus/common v0.45.0 // indirect github.com/prometheus/procfs v0.12.0 // indirect github.com/robfig/cron v1.2.0 // indirect github.com/rogpeppe/go-internal v1.11.0 // indirect diff --git a/pkg/db/advisory_locks.go b/pkg/db/advisory_locks.go index 6872640c..5541c1a5 100755 --- a/pkg/db/advisory_locks.go +++ b/pkg/db/advisory_locks.go @@ -19,8 +19,8 @@ type ( const ( Migrations LockType = "migrations" - Resources LockType = "Resources" - ResourceStatus LockType = "ResourceStatus" + Resources LockType = "resources" + ResourceStatus LockType = "resource_status" Events LockType = "events" Instances LockType = "instances" ) diff --git a/pkg/db/metrics_collector.go b/pkg/db/metrics_collector.go index 9f46c64d..288e5049 100755 --- a/pkg/db/metrics_collector.go +++ b/pkg/db/metrics_collector.go @@ -6,6 +6,11 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +func init() { + // Register the metrics for advisory locks + RegisterAdvisoryLockMetrics() +} + type MetricsCollector interface { } @@ -18,8 +23,8 @@ const ( metricsStatusLabel = "status" ) -// MetricsLabels - Array of labels added to metrics: -var MetricsLabels = []string{ +// metricsLabels - Array of labels added to metrics: +var metricsLabels = []string{ metricsTypeLabel, metricsStatusLabel, } @@ -30,12 +35,6 @@ const ( durationMetric = "duration" ) -// MetricsNames - Array of Names of the metrics: -var MetricsNames = []string{ - countMetric, - durationMetric, -} - // Description of the requests count metric: var advisoryLockCountMetric = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -43,7 +42,7 @@ var advisoryLockCountMetric = prometheus.NewCounterVec( Name: countMetric, Help: "Number of advisory lock requests.", }, - MetricsLabels, + metricsLabels, ) // Description of the request duration metric: @@ -61,7 +60,7 @@ var advisoryLockDurationMetric = prometheus.NewHistogramVec( 10.0, }, }, - MetricsLabels, + metricsLabels, ) // Register the metrics: @@ -76,7 +75,7 @@ func UnregisterAdvisoryLockMetrics() { prometheus.Unregister(advisoryLockDurationMetric) } -// ResetMetricCollectors resets all collectors +// ResetAdvisoryLockMetricsCollectors resets all collectors func ResetAdvisoryLockMetricsCollectors() { advisoryLockCountMetric.Reset() advisoryLockDurationMetric.Reset() diff --git a/pkg/services/resource.go b/pkg/services/resource.go index 68d3d6dc..d596c60e 100755 --- a/pkg/services/resource.go +++ b/pkg/services/resource.go @@ -10,6 +10,7 @@ import ( "github.com/openshift-online/maestro/pkg/dao" "github.com/openshift-online/maestro/pkg/db" logger "github.com/openshift-online/maestro/pkg/logger" + "github.com/prometheus/client_golang/prometheus" cegeneric "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" cetypes "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -19,6 +20,11 @@ import ( "github.com/openshift-online/maestro/pkg/errors" ) +func init() { + // Register the metrics for resource service + RegisterResourceMetrics() +} + type ResourceService interface { Get(ctx context.Context, id string) (*api.Resource, *errors.ServiceError) Create(ctx context.Context, resource *api.Resource) (*api.Resource, *errors.ServiceError) @@ -142,6 +148,15 @@ func (s *sqlResourceService) Update(ctx context.Context, resource *api.Resource) return nil, handleUpdateError("Resource", err) } + // Create the set of labels that we will add to all the resource process: + labels := prometheus.Labels{ + metricsIDLabel: updated.ID, + metricsActionLabel: "update", + } + + // Update the metric containing the number of processed resources: + resourceProcessedCountMetric.With(labels).Inc() + return updated, nil } @@ -215,6 +230,15 @@ func (s *sqlResourceService) UpdateStatus(ctx context.Context, resource *api.Res return nil, false, handleUpdateError("Resource", err) } + // Create the set of labels that we will add to all the resource process: + labels := prometheus.Labels{ + metricsIDLabel: updated.ID, + metricsActionLabel: "update", + } + + // Update the metric containing the number of processed resources: + resourceProcessedCountMetric.With(labels).Inc() + return updated, true, nil } @@ -334,3 +358,48 @@ func (s *sqlResourceService) syncTimestampsFromResourceMeta(resource *api.Resour } } } + +// Subsystem used to define the metrics: +const metricsSubsystem = "resource" + +// Names of the labels added to metrics: +const ( + metricsIDLabel = "id" + metricsActionLabel = "action" +) + +// metricsLabels - Array of labels added to metrics: +var metricsLabels = []string{ + metricsIDLabel, + metricsActionLabel, +} + +// Names of the metrics: +const ( + processedCountMetric = "processed_total" +) + +// Register the metrics: +func RegisterResourceMetrics() { + prometheus.MustRegister(resourceProcessedCountMetric) +} + +// Unregister the metrics: +func UnregisterResourceMetrics() { + prometheus.Unregister(resourceProcessedCountMetric) +} + +// Reset the metrics: +func ResetResourceMetrics() { + resourceProcessedCountMetric.Reset() +} + +// Description of the resource process count metric: +var resourceProcessedCountMetric = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Subsystem: metricsSubsystem, + Name: processedCountMetric, + Help: "Number of processed resources.", + }, + metricsLabels, +) diff --git a/test/integration/resource_test.go b/test/integration/resource_test.go index 5e9d86ce..2bf5230b 100755 --- a/test/integration/resource_test.go +++ b/test/integration/resource_test.go @@ -4,13 +4,17 @@ import ( "context" "encoding/json" "fmt" + "io" "net/http" + "strings" "sync" "testing" "time" "github.com/google/uuid" . "github.com/onsi/gomega" + prommodel "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" "gopkg.in/resty.v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -182,6 +186,19 @@ func TestResourcePost(t *testing.T) { Expect(contentStatus["availableReplicas"]).To(Equal(float64(1))) Expect(contentStatus["readyReplicas"]).To(Equal(float64(1))) Expect(contentStatus["updatedReplicas"]).To(Equal(float64(1))) + + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("id"), Value: resource.Id}, + {Name: strPtr("action"), Value: strPtr("create")}, + } + checkServerCounterMetric(t, families, "resource_processed_total", labels, 1.0) + labels = []*prommodel.LabelPair{ + {Name: strPtr("id"), Value: resource.Id}, + {Name: strPtr("action"), Value: strPtr("update")}, + } + checkServerCounterMetric(t, families, "resource_processed_total", labels, 1.0) } func TestResourcePostWithoutName(t *testing.T) { @@ -222,6 +239,14 @@ func TestResourcePostWithoutName(t *testing.T) { return nil }, 10*time.Second, 1*time.Second).Should(Succeed()) + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("id"), Value: resource.Id}, + {Name: strPtr("action"), Value: strPtr("create")}, + } + checkServerCounterMetric(t, families, "resource_processed_total", labels, 1.0) + // make sure controller manager and work agent are stopped cancel() } @@ -251,6 +276,14 @@ func TestResourcePostWithName(t *testing.T) { _, _, err = client.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() Expect(err).To(HaveOccurred()) + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("id"), Value: resource.Id}, + {Name: strPtr("action"), Value: strPtr("create")}, + } + checkServerCounterMetric(t, families, "resource_processed_total", labels, 1.0) + // make sure controller manager and work agent are stopped cancel() } @@ -365,15 +398,15 @@ func TestResourcePatch(t *testing.T) { openapi.ResourcePatchRequest{Version: &res.Version, Manifest: newRes.Manifest}).Execute() Expect(err).To(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusConflict)) -} -func contains(et api.EventType, events api.EventList) bool { - for _, e := range events { - if e.EventType == et { - return true - } + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + + labels := []*prommodel.LabelPair{ + {Name: strPtr("id"), Value: resource.Id}, + {Name: strPtr("action"), Value: strPtr("update")}, } - return false + checkServerCounterMetric(t, families, "resource_processed_total", labels, 1.0) } func TestResourcePaging(t *testing.T) { @@ -452,6 +485,15 @@ func TestResourceBundleGet(t *testing.T) { Expect(*resBundle.CreatedAt).To(BeTemporally("~", resourceBundle.CreatedAt)) Expect(*resBundle.UpdatedAt).To(BeTemporally("~", resourceBundle.UpdatedAt)) Expect(*resBundle.Version).To(Equal(resourceBundle.Version)) + + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("method"), Value: strPtr("GET")}, + {Name: strPtr("path"), Value: strPtr("/api/maestro/v1/resource-bundles/-")}, + {Name: strPtr("code"), Value: strPtr("200")}, + } + checkServerCounterMetric(t, families, "rest_api_inbound_request_count", labels, 1.0) } func TestResourceBundleListSearch(t *testing.T) { @@ -479,6 +521,15 @@ func TestResourceBundleListSearch(t *testing.T) { Expect(err).NotTo(HaveOccurred(), "Error getting resource bundle list: %v", err) Expect(len(list.Items)).To(Equal(20)) Expect(list.Total).To(Equal(int32(20))) + + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("method"), Value: strPtr("GET")}, + {Name: strPtr("path"), Value: strPtr("/api/maestro/v1/resource-bundles")}, + {Name: strPtr("code"), Value: strPtr("200")}, + } + checkServerCounterMetric(t, families, "rest_api_inbound_request_count", labels, 2.0) } func TestUpdateResourceWithRacingRequests(t *testing.T) { @@ -755,6 +806,37 @@ func TestResourceFromGRPC(t *testing.T) { return nil }, 10*time.Second, 1*time.Second).Should(Succeed()) + // check the metrics + families := getServerMetrics(t, "http://localhost:8080/metrics") + labels := []*prommodel.LabelPair{ + {Name: strPtr("type"), Value: strPtr("Publish")}, + {Name: strPtr("source"), Value: strPtr("maestro")}, + } + checkServerCounterMetric(t, families, "maestro_grpc_server_started_total", labels, 3.0) + checkServerCounterMetric(t, families, "maestro_grpc_server_msg_received_total", labels, 3.0) + checkServerCounterMetric(t, families, "maestro_grpc_server_msg_sent_total", labels, 3.0) + + labels = []*prommodel.LabelPair{ + {Name: strPtr("type"), Value: strPtr("Subscribe")}, + {Name: strPtr("source"), Value: strPtr("maestro")}, + } + checkServerCounterMetric(t, families, "maestro_grpc_server_started_total", labels, 1.0) + checkServerCounterMetric(t, families, "maestro_grpc_server_msg_received_total", labels, 1.0) + //checkServerCounterMetric(t, families, "maestro_grpc_server_msg_sent_total", labels, 2.0) + + labels = []*prommodel.LabelPair{ + {Name: strPtr("type"), Value: strPtr("Publish")}, + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("code"), Value: strPtr("OK")}, + } + checkServerCounterMetric(t, families, "maestro_grpc_server_handled_total", labels, 3.0) + + labels = []*prommodel.LabelPair{ + {Name: strPtr("type"), Value: strPtr("Subscribe")}, + {Name: strPtr("source"), Value: strPtr("maestro")}, + {Name: strPtr("code"), Value: strPtr("OK")}, + } + checkServerCounterMetric(t, families, "maestro_grpc_server_handled_total", labels, 0.0) } func TestResourceBundleFromGRPC(t *testing.T) { @@ -921,3 +1003,79 @@ func TestResourceBundleFromGRPC(t *testing.T) { Expect(len(list.Items)).To(Equal(1)) Expect(list.Total).To(Equal(int32(1))) } + +func contains(et api.EventType, events api.EventList) bool { + for _, e := range events { + if e.EventType == et { + return true + } + } + return false +} + +func getServerMetrics(t *testing.T, url string) map[string]*prommodel.MetricFamily { + // gather metrics from metrics server from url /metrics + resp, err := http.Get(url) + if err != nil { + t.Errorf("Error getting metrics: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + t.Errorf("Error getting metrics with status code: %v", resp.StatusCode) + } + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Errorf("Error reading metrics: %v", err) + } + parser := expfmt.TextParser{} + // Ensure EOL + reader := strings.NewReader(strings.ReplaceAll(string(body), "\r\n", "\n")) + families, err := parser.TextToMetricFamilies(reader) + if err != nil { + t.Errorf("Error parsing metrics: %v", err) + } + + return families +} + +func checkServerCounterMetric(t *testing.T, families map[string]*prommodel.MetricFamily, name string, labels []*prommodel.LabelPair, value float64) { + family, ok := families[name] + if !ok { + t.Errorf("Metric %s not found", name) + } + metricValue := 0.0 + metrics := family.GetMetric() + for _, metric := range metrics { + metricLabels := metric.GetLabel() + if !compareMetricLabels(labels, metricLabels) { + continue + } + metricValue += *metric.Counter.Value + } + if metricValue != value { + t.Errorf("Counter metric %s value is %f, expected %f", name, metricValue, value) + } +} + +func compareMetricLabels(labels []*prommodel.LabelPair, metricLabels []*prommodel.LabelPair) bool { + if len(labels) != len(metricLabels) { + return false + } + for _, label := range labels { + match := false + for _, metricLabel := range metricLabels { + if *label.Name == *metricLabel.Name && *label.Value == *metricLabel.Value { + match = true + break + } + } + if !match { + return false + } + } + return true +} + +func strPtr(s string) *string { + return &s +}