From e3051fd088a88e1c797bc4c94b4a650c0697054b Mon Sep 17 00:00:00 2001 From: Daniel Hrabovcak Date: Tue, 28 May 2024 12:03:56 -0400 Subject: [PATCH] feat(e2e): use a fake metric service for e2e tests --- Makefile | 2 +- cmd/fake-metric-service/Dockerfile | 29 ++ cmd/fake-metric-service/main.go | 123 +++++ e2e/collector_test.go | 17 +- e2e/deploy/deploy.go | 30 +- e2e/deploy/fake.go | 105 +++++ e2e/main_test.go | 49 +- e2e/ruler_test.go | 7 +- hack/kind-test.sh | 5 +- pkg/e2e/fake_metric_server.go | 300 ++++++++++++ pkg/e2e/fake_metric_server_test.go | 705 +++++++++++++++++++++++++++++ pkg/export/export.go | 20 +- 12 files changed, 1345 insertions(+), 47 deletions(-) create mode 100644 cmd/fake-metric-service/Dockerfile create mode 100644 cmd/fake-metric-service/main.go create mode 100644 e2e/deploy/fake.go create mode 100644 pkg/e2e/fake_metric_server.go create mode 100644 pkg/e2e/fake_metric_server_test.go diff --git a/Makefile b/Makefile index f841140b30..29eed63d88 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ endif ifeq ($(KIND_PERSIST), 1) E2E_DOCKER_ARGS += --env KIND_PERSIST=1 endif -E2E_DEPS:=config-reloader operator rule-evaluator go-synthetic +E2E_DEPS:=config-reloader operator rule-evaluator go-synthetic fake-metric-service REGISTRY_NAME=kind-registry REGISTRY_PORT=5001 KIND_PARALLEL?=5 diff --git a/cmd/fake-metric-service/Dockerfile b/cmd/fake-metric-service/Dockerfile new file mode 100644 index 0000000000..169ee9fd0f --- /dev/null +++ b/cmd/fake-metric-service/Dockerfile @@ -0,0 +1,29 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM google-go.pkg.dev/golang:1.22.3@sha256:efc6b824652199ede8bc25e2d5a57076e0e1ffbe90735dcbda3ee956fe33b612 as buildbase +WORKDIR /app +COPY go.mod go.mod +COPY go.sum go.sum +COPY cmd cmd +COPY pkg pkg +COPY vendor vendor + +FROM buildbase as appbase +RUN CGO_ENABLED=1 GOEXPERIMENT=boringcrypto \ + go build -tags boring -mod=vendor -o fake-metric-service cmd/fake-metric-service/*.go + +FROM gke.gcr.io/gke-distroless/libc:gke_distroless_20240307.00_p0@sha256:4f834e207f2721977094aeec4c9daee7032c5daec2083c0be97760f4306e4f88 +COPY --from=appbase /app/fake-metric-service /bin/fake-metric-service +ENTRYPOINT ["/bin/fake-metric-service"] diff --git a/cmd/fake-metric-service/main.go b/cmd/fake-metric-service/main.go new file mode 100644 index 0000000000..aa3ca49c35 --- /dev/null +++ b/cmd/fake-metric-service/main.go @@ -0,0 +1,123 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "net" + "net/http" + "os" + "sync" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/GoogleCloudPlatform/prometheus-engine/pkg/e2e" + "github.com/go-logr/logr" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" +) + +func main() { + logVerbosity := 0 + probeAddr := ":8080" + metricServiceAddr := ":8081" + + flag.IntVar(&logVerbosity, "v", logVerbosity, "Logging verbosity") + flag.StringVar(&probeAddr, "probe-addr", probeAddr, "Address to outputs probe statuses (e.g. /readyz and /livez)") + flag.StringVar(&metricServiceAddr, "metric-service-addr", metricServiceAddr, "Address to serve a mock metric service server.") + flag.Parse() + + logger := zap.New(zap.Level(zapcore.Level(-logVerbosity))) + ctrl.SetLogger(logger) + + ctx := signals.SetupSignalHandler() + if err := run(ctx, logger, probeAddr, metricServiceAddr); err != nil { + logger.Error(err, "exit with error") + os.Exit(1) + } +} + +func run(ctx context.Context, logger logr.Logger, probeAddr, metricServiceAddr string) error { + listener, err := net.Listen("tcp", metricServiceAddr) + if err != nil { + return err + } + + wg := sync.WaitGroup{} + errs := make(chan error, 1) + + logger.Info("starting server...") + + { + s := grpc.NewServer() + monitoringpb.RegisterMetricServiceServer(s, e2e.NewFakeMetricServer()) + + wg.Add(1) + go func() { + for range ctx.Done() { + s.GracefulStop() + return + } + }() + go func() { + defer wg.Done() + if err := s.Serve(listener); err != nil { + errs <- err + } + }() + } + + { + wg.Add(1) + mux := http.NewServeMux() + mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + mux.Handle("/livez", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + server := http.Server{ + Addr: probeAddr, + Handler: mux, + } + go func() { + for range ctx.Done() { + // Start new context because ours is done. + if err := server.Shutdown(context.Background()); err != nil { + errs <- err + } + } + }() + go func() { + defer wg.Done() + if err := server.ListenAndServe(); err != nil { + errs <- err + } + }() + } + + go func() { + wg.Wait() + close(errs) + }() + + for err := range errs { + return err + } + return nil +} diff --git a/e2e/collector_test.go b/e2e/collector_test.go index ae4b27b8ef..c71d559f8f 100644 --- a/e2e/collector_test.go +++ b/e2e/collector_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - gcm "cloud.google.com/go/monitoring/apiv3/v2" gcmpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/GoogleCloudPlatform/prometheus-engine/e2e/kube" monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1" @@ -79,9 +78,7 @@ func TestCollectorPodMonitoring(t *testing.T) { }, } t.Run("self-podmonitoring-ready", testEnsurePodMonitoringReady(ctx, kubeClient, pm)) - if !skipGCM { - t.Run("self-podmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-podmon")) - } + t.Run("self-podmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-podmon")) } func TestCollectorClusterPodMonitoring(t *testing.T) { @@ -120,9 +117,7 @@ func TestCollectorClusterPodMonitoring(t *testing.T) { }, } t.Run("self-clusterpodmonitoring-ready", testEnsureClusterPodMonitoringReady(ctx, kubeClient, cpm)) - if !skipGCM { - t.Run("self-clusterpodmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-cmon")) - } + t.Run("self-clusterpodmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-cmon")) } func TestCollectorKubeletScraping(t *testing.T) { @@ -138,9 +133,7 @@ func TestCollectorKubeletScraping(t *testing.T) { t.Run("collector-operatorconfig", testCollectorOperatorConfig(ctx, kubeClient)) t.Run("enable-kubelet-scraping", testEnableKubeletScraping(ctx, kubeClient)) - if !skipGCM { - t.Run("scrape-kubelet", testCollectorScrapeKubelet(ctx, kubeClient)) - } + t.Run("scrape-kubelet", testCollectorScrapeKubelet(ctx, kubeClient)) } // testCollectorDeployed does a high-level verification on whether the @@ -445,7 +438,7 @@ func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Clien t.Log("checking for metrics in Cloud Monitoring") // Wait for metric data to show up in Cloud Monitoring. - metricClient, err := gcm.NewMetricClient(ctx) + metricClient, err := newMetricClient(ctx) if err != nil { t.Fatalf("create metric client: %s", err) } @@ -524,7 +517,7 @@ func testCollectorScrapeKubelet(ctx context.Context, kubeClient client.Client) f t.Log("checking for metrics in Cloud Monitoring") // Wait for metric data to show up in Cloud Monitoring. - metricClient, err := gcm.NewMetricClient(ctx) + metricClient, err := newMetricClient(ctx) if err != nil { t.Fatalf("create GCM metric client: %s", err) } diff --git a/e2e/deploy/deploy.go b/e2e/deploy/deploy.go index c3c0254208..8e32df63e4 100644 --- a/e2e/deploy/deploy.go +++ b/e2e/deploy/deploy.go @@ -55,6 +55,7 @@ type deployOptions struct { cluster string location string disableGCM bool + gcmEndpoint string } func (opts *deployOptions) setDefaults() { @@ -92,6 +93,12 @@ func WithDisableGCM(disableGCM bool) DeployOption { } } +func WithGCMService(gcmService string) DeployOption { + return func(opts *deployOptions) { + opts.gcmEndpoint = gcmService + } +} + func createResources(ctx context.Context, kubeClient client.Client, normalizeFn func(client.Object) (client.Object, error)) error { resources, err := resources(kubeClient.Scheme()) if err != nil { @@ -130,16 +137,18 @@ func resources(scheme *runtime.Scheme) ([]client.Object, error) { } func normalizeDeamonSets(opts *deployOptions, obj *appsv1.DaemonSet) (client.Object, error) { - if !opts.disableGCM { - return obj, nil - } if obj.GetName() != operator.NameCollector { return obj, nil } for i := range obj.Spec.Template.Spec.Containers { container := &obj.Spec.Template.Spec.Containers[i] if container.Name == operator.CollectorPrometheusContainerName { - container.Args = append(container.Args, "--export.debug.disable-auth") + if opts.disableGCM || opts.gcmEndpoint != "" { + container.Args = append(container.Args, "--export.debug.disable-auth") + } + if opts.gcmEndpoint != "" { + container.Args = append(container.Args, fmt.Sprintf("--export.endpoint=%s", opts.gcmEndpoint)) + } return obj, nil } } @@ -169,15 +178,18 @@ func normalizeDeployments(opts *deployOptions, obj *appsv1.Deployment) (client.O container.Args = append(container.Args, fmt.Sprintf("--public-namespace=%s", opts.publicNamespace)) } case operator.NameRuleEvaluator: - if !opts.disableGCM { - break - } container, err := kube.DeploymentContainer(obj, operator.RuleEvaluatorContainerName) if err != nil { return nil, fmt.Errorf("unable to find rule-evaluator %q container: %w", operator.RuleEvaluatorContainerName, err) } - container.Args = append(container.Args, "--export.debug.disable-auth") - container.Args = append(container.Args, "--query.debug.disable-auth") + if opts.disableGCM || opts.gcmEndpoint != "" { + container.Args = append(container.Args, "--export.debug.disable-auth") + container.Args = append(container.Args, "--query.debug.disable-auth") + } + if opts.gcmEndpoint != "" { + container.Args = append(container.Args, fmt.Sprintf("--export.endpoint=%s", opts.gcmEndpoint)) + container.Args = append(container.Args, fmt.Sprintf("--query.target-url=%s", opts.gcmEndpoint)) + } } return obj, nil } diff --git a/e2e/deploy/fake.go b/e2e/deploy/fake.go new file mode 100644 index 0000000000..7e26554556 --- /dev/null +++ b/e2e/deploy/fake.go @@ -0,0 +1,105 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deploy + +import ( + "context" + "fmt" + + "github.com/GoogleCloudPlatform/prometheus-engine/e2e/kube" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func CreateFakeMetricService(ctx context.Context, kubeClient client.Client, namespace, name, image string) error { + labels := map[string]string{ + "app.kubernetes.io/name": "fake-metric-service", + } + if err := kubeClient.Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "server", + Image: image, + Ports: []corev1.ContainerPort{ + { + Name: "metric-service", + ContainerPort: 8081, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "livez", + Port: intstr.FromInt(8080), + Scheme: "HTTP", + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "readyz", + Port: intstr.FromInt(8080), + Scheme: "HTTP", + }, + }, + }, + }, + }, + }, + }, + }, + }); err != nil { + return fmt.Errorf("create metric-service deployment: %w", err) + } + if err := kubeClient.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Port: 8080, + TargetPort: intstr.FromString("metric-service"), + }, + }, + }, + }); err != nil { + return fmt.Errorf("create metric-service service: %w", err) + } + return kube.WaitForDeploymentReady(ctx, kubeClient, namespace, name) +} + +func FakeMetricServiceEndpoint() string { + return "metric-service.default.svc.cluster.local:8081" +} diff --git a/e2e/main_test.go b/e2e/main_test.go index 915156f3de..98d30c3281 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -26,7 +26,11 @@ import ( "testing" "time" + gcm "cloud.google.com/go/monitoring/apiv3/v2" "go.uber.org/zap/zapcore" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" metav1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" @@ -51,9 +55,12 @@ const ( var ( projectID, location, cluster string - skipGCM bool + fakeGCM bool pollDuration time.Duration + imageTag string + imageRegistryPort int + gcpServiceAccount string ) @@ -64,18 +71,20 @@ func TestMain(m *testing.M) { flag.StringVar(&projectID, "project-id", "", "The GCP project to write metrics to.") flag.StringVar(&location, "location", "", "The location of the Kubernetes cluster that's tested against.") flag.StringVar(&cluster, "cluster", "", "The name of the Kubernetes cluster that's tested against.") - flag.BoolVar(&skipGCM, "skip-gcm", false, "Skip validating GCM ingested points.") + flag.BoolVar(&fakeGCM, "fake-metric-service", true, "Use a fake GCM endpoint.") flag.DurationVar(&pollDuration, "duration", 3*time.Second, "How often to poll and retry for resources.") flag.StringVar(&gcpServiceAccount, "gcp-service-account", "", "Path to GCP service account file for usage by deployed containers.") + flag.StringVar(&imageTag, "image-tag", "", "The tag to use from the local registry.") + flag.IntVar(&imageRegistryPort, "registry-port", -1, "The port of the local registry.") + flag.Parse() os.Exit(m.Run()) } func setupCluster(ctx context.Context, t testing.TB) (client.Client, *rest.Config, error) { - t.Log(">>> deploying static resources") restConfig, err := newRestConfig() if err != nil { return nil, nil, err @@ -86,7 +95,8 @@ func setupCluster(ctx context.Context, t testing.TB) (client.Client, *rest.Confi return nil, nil, err } - if err := createResources(ctx, kubeClient); err != nil { + t.Log(">>> deploying static resources") + if err := createResources(ctx, kubeClient, imageTag, imageRegistryPort); err != nil { return nil, nil, err } @@ -153,8 +163,21 @@ func newScheme() (*runtime.Scheme, error) { return scheme, nil } -func createResources(ctx context.Context, kubeClient client.Client) error { - if err := deploy.CreateResources(ctx, kubeClient, deploy.WithMeta(projectID, cluster, location), deploy.WithDisableGCM(skipGCM)); err != nil { +func createResources(ctx context.Context, kubeClient client.Client, imageTag string, imageRegistryPort int) error { + deployOpts := []deploy.DeployOption{ + deploy.WithMeta(projectID, cluster, location), + } + if fakeGCM { + if imageRegistryPort == -1 { + return fmt.Errorf("--registry-port must be provided with --fake-metric-service") + } + metricServiceImage := fmt.Sprintf("localhost:%d/fake-metric-service:%s", imageRegistryPort, imageTag) + if err := deploy.CreateFakeMetricService(ctx, kubeClient, metav1.NamespaceDefault, "metric-service", metricServiceImage); err != nil { + return err + } + deployOpts = append(deployOpts, deploy.WithGCMService(deploy.FakeMetricServiceEndpoint())) + } + if err := deploy.CreateResources(ctx, kubeClient, deployOpts...); err != nil { return err } @@ -187,3 +210,17 @@ func contextWithDeadline(t *testing.T) context.Context { t.Cleanup(cancel) return ctx } + +func newMetricClient(ctx context.Context) (*gcm.MetricClient, error) { + clientOpts := []option.ClientOption{ + option.WithUserAgent("prometheus-engine-e2e"), + } + if fakeGCM { + clientOpts = append(clientOpts, + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithEndpoint(deploy.FakeMetricServiceEndpoint()), + ) + } + return gcm.NewMetricClient(ctx, clientOpts...) +} diff --git a/e2e/ruler_test.go b/e2e/ruler_test.go index d48e8912d5..5e68377633 100644 --- a/e2e/ruler_test.go +++ b/e2e/ruler_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - gcm "cloud.google.com/go/monitoring/apiv3/v2" gcmpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator" monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1" @@ -85,9 +84,7 @@ func testRuleEvaluator(t *testing.T, features monitoringv1.OperatorFeatures) { t.Run("rule-evaluator-configuration", testRuleEvaluatorConfiguration(ctx, kubeClient)) t.Run("rules-create", testCreateRules(ctx, restConfig, kubeClient, operator.DefaultOperatorNamespace, metav1.NamespaceDefault, features)) - if !skipGCM { - t.Run("rules-gcm", testValidateRuleEvaluationMetrics(ctx)) - } + t.Run("rules-gcm", testValidateRuleEvaluationMetrics(ctx)) } func testRuleEvaluatorDeployed(ctx context.Context, kubeClient client.Client) func(*testing.T) { @@ -606,7 +603,7 @@ func testValidateRuleEvaluationMetrics(ctx context.Context) func(*testing.T) { t.Log("checking for metrics in Cloud Monitoring") // Wait for metric data to show up in Cloud Monitoring. - metricClient, err := gcm.NewMetricClient(ctx) + metricClient, err := newMetricClient(ctx) if err != nil { t.Fatalf("create metric client: %s", err) } diff --git a/hack/kind-test.sh b/hack/kind-test.sh index 6da3eb81b3..4bf2ba0c45 100755 --- a/hack/kind-test.sh +++ b/hack/kind-test.sh @@ -23,7 +23,7 @@ GO_TEST=$1 REPO_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. TAG_NAME=$(date "+gmp-%Y%d%m_%H%M") -TEST_ARGS="" +TEST_ARGS="-image-tag=${TAG_NAME} -registry-port=${REGISTRY_PORT}" # Convert kind cluster name to required regex if necessary. # We need to ensure this name is not too long due to: https://github.com/kubernetes-sigs/kind/issues/623 # while still unique enough to avoid dups between similar test names when trimming. @@ -36,9 +36,6 @@ KUBECTL="kubectl --context kind-${KIND_CLUSTER}" GMP_CLUSTER=$TAG_NAME if [[ -n "${GOOGLE_APPLICATION_CREDENTIALS:-}" ]]; then PROJECT_ID=$(jq -r '.project_id' "${GOOGLE_APPLICATION_CREDENTIALS}") -else - echo ">>> no credentials specified. running without GCM validation" - TEST_ARGS="${TEST_ARGS} -skip-gcm" fi TEST_ARGS="${TEST_ARGS} -project-id=${PROJECT_ID} -location=${GMP_LOCATION} -cluster=${GMP_CLUSTER}" diff --git a/pkg/e2e/fake_metric_server.go b/pkg/e2e/fake_metric_server.go new file mode 100644 index 0000000000..1c58050daf --- /dev/null +++ b/pkg/e2e/fake_metric_server.go @@ -0,0 +1,300 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "errors" + "fmt" + "regexp" + "strings" + "time" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/types/known/emptypb" + "k8s.io/utils/ptr" +) + +// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create +const defaultMaxTimeSeriesPerRequest = 200 + +func NewFakeMetricServer() monitoringpb.MetricServiceServer { + return newFakeMetricServer() +} + +func newFakeMetricServer() *fakeMetricServer { + return &fakeMetricServer{ + timeSeriesByProject: make(map[string][]*monitoringpb.TimeSeries), + maxTimeSeriesPerRequest: defaultMaxTimeSeriesPerRequest, + } +} + +type fakeMetricServer struct { + monitoringpb.UnimplementedMetricServiceServer + timeSeriesByProject map[string][]*monitoringpb.TimeSeries + maxTimeSeriesPerRequest int +} + +func (fms *fakeMetricServer) CreateTimeSeries(_ context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) { + if req == nil { + return nil, errors.New("nil request") + } + if len(req.TimeSeries) < 1 { + return nil, errors.New("there are no time series to add") + } + if len(req.TimeSeries) > fms.maxTimeSeriesPerRequest { + return nil, errors.New("exceeded the max number of time series") + } + + var timeSeriesToProcess []*monitoringpb.TimeSeries + for _, timeSeries := range req.TimeSeries { + if len(timeSeries.Points) == 1 { + timeSeriesToProcess = append(timeSeriesToProcess, timeSeries) + } + } + numErrors := len(req.TimeSeries) - len(timeSeriesToProcess) + + // this is pretty inefficient, but it is only used for testing purposes + for _, singleTimeSeriesToAdd := range timeSeriesToProcess { + // new project with a timeseries + if fms.timeSeriesByProject[req.Name] == nil { + fms.timeSeriesByProject[req.Name] = []*monitoringpb.TimeSeries{singleTimeSeriesToAdd} + } else { // project already exists + for i, singleTimeSeriesInMemory := range fms.timeSeriesByProject[req.Name] { + inMemMetric := singleTimeSeriesInMemory.Metric + toAddMetric := singleTimeSeriesToAdd.Metric + inMemResource := singleTimeSeriesInMemory.Resource + toAddResource := singleTimeSeriesToAdd.Resource + + // if this specific time series already exists, add it + if inMemMetric.Type == toAddMetric.Type && cmp.Equal(inMemMetric.Labels, toAddMetric.Labels) && + inMemResource.Type == toAddResource.Type && cmp.Equal(inMemResource.Labels, toAddResource.Labels) { + // only add this point if the start time of the point to add is greater than the end point latest in this time series + if singleTimeSeriesToAdd.Points[0].Interval.StartTime.AsTime().After(singleTimeSeriesInMemory.Points[len(singleTimeSeriesInMemory.Points)-1].Interval.EndTime.AsTime()) { + // add the new point to the beginning + singleTimeSeriesInMemory.Points = append(singleTimeSeriesToAdd.Points, singleTimeSeriesInMemory.Points...) + } else { + numErrors++ + } + break + // if we make it into this else if block then we are adding a new time series for an existing project -- just append it. + } else if i == len(fms.timeSeriesByProject[req.Name])-1 { + fms.timeSeriesByProject[req.Name] = append(fms.timeSeriesByProject[req.Name], singleTimeSeriesToAdd) + } + } + } + } + + var err error + var response *emptypb.Empty + if numErrors > 0 { + err = fmt.Errorf("there were %d time series that could not be added", numErrors) + } + if numErrors != len(req.TimeSeries) { + response = &emptypb.Empty{} + } + return response, err +} + +func (fms *fakeMetricServer) ListTimeSeries(_ context.Context, req *monitoringpb.ListTimeSeriesRequest) (*monitoringpb.ListTimeSeriesResponse, error) { + if req == nil { + return nil, errors.New("nil request") + } + if req.Aggregation != nil || req.SecondaryAggregation != nil { + return nil, errors.New("fake metric server does not support aggregation") + } + if req.Interval == nil { + return nil, errors.New("time interval required") + } + if req.View == monitoringpb.ListTimeSeriesRequest_HEADERS { + return nil, errors.New("header view is not supported") + } + + expressionFilter, err := parseFilter(req.Filter) + if err != nil { + return nil, err + } + + filters := []listFilter{ + newIntervalFilter(req.Interval), + expressionFilter, + } + + var timeSeriesToReturn []*monitoringpb.TimeSeries + for _, timeSeries := range fms.timeSeriesByProject[req.Name] { + var pointsToReturn []*monitoringpb.Point + pointLabel: + for _, point := range timeSeries.Points { + for _, filter := range filters { + if !filter.filter(timeSeries, point) { + continue pointLabel + } + } + pointsToReturn = append(pointsToReturn, point) + } + if len(pointsToReturn) == 0 { + continue + } + newTimeSeries := &monitoringpb.TimeSeries{ + Metric: timeSeries.Metric, + Resource: timeSeries.Resource, + Metadata: timeSeries.Metadata, + MetricKind: timeSeries.MetricKind, + ValueType: timeSeries.ValueType, + Points: pointsToReturn, + Unit: timeSeries.Unit, + } + timeSeriesToReturn = append(timeSeriesToReturn, newTimeSeries) + } + return &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: timeSeriesToReturn, + }, nil +} + +type listFilter interface { + // filter returns true to keep the given point. + filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool +} + +type dateFilter struct { + startTime time.Time + endTime *time.Time +} + +func newIntervalFilter(interval *monitoringpb.TimeInterval) listFilter { + filter := &dateFilter{ + startTime: interval.StartTime.AsTime(), + } + if interval.EndTime != nil { + filter.endTime = ptr.To(interval.EndTime.AsTime()) + } + return filter +} + +func (f *dateFilter) filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + pointStartTime := point.Interval.StartTime.AsTime() + if pointStartTime.Before(f.startTime) { + return false + } + if f.endTime == nil { + return true + } + + pointEndTime := point.Interval.EndTime.AsTime() + return !pointEndTime.After(*f.endTime) +} + +var ( + metricLabelRegex = regexp.MustCompile(`[a-z][a-z0-9_.\/]*`) + quotedStringRegex = regexp.MustCompile(`"(?:[^"\\]|\\.)*"`) + boolRegex = regexp.MustCompile(`(true)|(false)`) + numberRegex = regexp.MustCompile(`\d+`) + expressionObjectRegex = regexp.MustCompile(fmt.Sprintf(`(project)|(group.id)|(metric.type)|(metric.labels.%[1]s)|(resource.type)|(resource.labels.%[1]s)|(metadata.system_labels.%[1]s)`, metricLabelRegex.String())) + expressionOperatorRegex = regexp.MustCompile(`=|>|<|(>=)|(<=)|(!=)|:`) + expressionValueRegex = regexp.MustCompile(fmt.Sprintf(`(%s)|(%s)|(%s)`, quotedStringRegex.String(), boolRegex.String(), numberRegex.String())) + expressionRegex = regexp.MustCompile(fmt.Sprintf(`^(?P%s)\s*(?P%s)\s*(?P%s)(?P\s+.*)?$`, expressionObjectRegex.String(), expressionOperatorRegex.String(), expressionValueRegex.String())) +) + +// parseFilter parses the Google Cloud Monitoring API filter. +// +// See: https://cloud.google.com/monitoring/api/v3/filters#filter_syntax +// +// Currently limited, e.g. does not support NOTs, parenthesis or order of operations. +func parseFilter(filter string) (listFilter, error) { + filter = strings.TrimSpace(filter) + submatches := expressionRegex.FindStringSubmatch(filter) + if submatches == nil { + return nil, fmt.Errorf("invalid expression %q", filter) + } + object := submatches[expressionRegex.SubexpIndex("object")] + operator := submatches[expressionRegex.SubexpIndex("operator")] + value := submatches[expressionRegex.SubexpIndex("value")] + rest := submatches[expressionRegex.SubexpIndex("rest")] + + if operator != "=" { + return nil, fmt.Errorf("invalid operator %q in expression %q", operator, filter) + } + eq := &equalExpression{ + object: object, + // Extract from quotes, if quoted. + value: strings.TrimSuffix(strings.TrimPrefix(value, "\""), "\""), + } + if rest == "" { + return eq, nil + } + + expressionLogicalRegex := regexp.MustCompile(`\s+(?PAND)\s+(?P.*)`) + submatches = expressionLogicalRegex.FindStringSubmatch(rest) + if submatches == nil { + return nil, fmt.Errorf("invalid sub-expression %q", strings.TrimSpace(rest)) + } + + logicalOperator := submatches[expressionLogicalRegex.SubexpIndex("operator")] + rest = submatches[expressionLogicalRegex.SubexpIndex("rest")] + if logicalOperator != "AND" { + return nil, fmt.Errorf("invalid logical operator %q in expression %q", logicalOperator, filter) + } + + inner, err := parseFilter(rest) + if err != nil { + return nil, err + } + return &andExpression{ + left: eq, + right: inner, + }, nil +} + +type equalExpression struct { + object string + value string +} + +func (e *equalExpression) filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + return e.value == extractValue(timeSeries, point, e.object) +} + +func extractValue(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point, object string) string { + switch object { + case "project": + return timeSeries.GetResource().GetLabels()["project_id"] + case "metric.type": + return timeSeries.Metric.Type + } + labelsPrefix := "metric.labels." + if strings.HasPrefix(object, labelsPrefix) { + labelName := object[len(labelsPrefix):] + return extractLabel(timeSeries, labelName) + } + return "" +} + +func extractLabel(timeSeries *monitoringpb.TimeSeries, label string) string { + if metadata := timeSeries.GetMetadata(); metadata != nil { + if val, ok := metadata.GetUserLabels()[label]; ok { + return val + } + } + return "" +} + +type andExpression struct { + left, right listFilter +} + +func (e *andExpression) filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + return e.left.filter(timeSeries, point) && e.right.filter(timeSeries, point) +} diff --git a/pkg/e2e/fake_metric_server_test.go b/pkg/e2e/fake_metric_server_test.go new file mode 100644 index 0000000000..0b8f21ee09 --- /dev/null +++ b/pkg/e2e/fake_metric_server_test.go @@ -0,0 +1,705 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "reflect" + "testing" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/google/go-cmp/cmp" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" +) + +type createTimeSeriesTest struct { + desc string + requests []*monitoringpb.CreateTimeSeriesRequest + // index we expect the newly added timeseries to be in the fake metric server + timeSeriesIndexToCheck []int + // index we expect the newly added point to be in the fake metric server + pointsIndexToCheck []int +} + +type listTimeSeriesTest struct { + desc string + request *monitoringpb.ListTimeSeriesRequest + expected *monitoringpb.ListTimeSeriesResponse +} + +// Returns true if every field in TimeSeries a is deeply equal to TimeSeries b +// ignoring the Points field. False otherwise. +func timeSeriesEqualsExceptPoints(a *monitoringpb.TimeSeries, b *monitoringpb.TimeSeries) bool { + tmp := a.Points + a.Points = b.Points + isEqual := reflect.DeepEqual(a, b) + a.Points = tmp + return isEqual +} + +func TestCreateTimeSeriesBadInput(t *testing.T) { + fms := fakeMetricServer{ + maxTimeSeriesPerRequest: 1, + timeSeriesByProject: make(map[string][]*monitoringpb.TimeSeries), + } + projectName := "projects/1234" + // add a time series to the FakeMetricServer so that + // TestAddPointInPast will fail as expected + timeSeries := []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }} + createTimeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{ + Name: projectName, + TimeSeries: timeSeries, + } + if _, err := fms.CreateTimeSeries(context.TODO(), createTimeSeriesRequest); err != nil { + t.Fatalf("create time series: %s", err) + } + + // these are the subtests + tests := []*createTimeSeriesTest{ + { + desc: "TestNil", + requests: []*monitoringpb.CreateTimeSeriesRequest{nil}, + }, + { + desc: "TestExceedMaxTimeSeriesPerRequest", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + // Note: the TimeSeries are empty here since the check for exceeding + // the max timeseries in a requet happens before we verify + // data the data in the TimeSeries. + TimeSeries: []*monitoringpb.TimeSeries{{}, {}}, + }, + }, + }, + { + desc: "TestNoTimeSeriesToAdd", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + }, + }, + }, + { + desc: "TestNoPointInTimeSeriesToAdd", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + }}, + }, + }, + }, + { + desc: "TestAddPointInPast", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + for _, request := range test.requests { + response, err := fms.CreateTimeSeries(context.TODO(), request) + if err == nil && response != nil { + t.Errorf("expected an error for %q", test.desc) + } + } + }) + } +} + +func TestCreateTimeSeries(t *testing.T) { + fms := newFakeMetricServer() + projectName := "projects/1234" + + // these are the subtests + tests := []*createTimeSeriesTest{ + // This test adds a new time series with a new project id to the fake metric server. + // It then adds a new time series to the same project. + // It then adds a new point to the second time series. + { + desc: "TestCreateTimeSeries-NewProject-NewTimeSeries-NewPoint", + timeSeriesIndexToCheck: []int{0, 1, 1}, + pointsIndexToCheck: []int{0, 0, 0}, + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job2", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job2", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 3}, + EndTime: ×tamppb.Timestamp{Seconds: 4}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + for i, request := range test.requests { + response, err := fms.CreateTimeSeries(context.TODO(), request) + if err != nil || response == nil { + t.Errorf("unexpected error: %s", err) + } + if !timeSeriesEqualsExceptPoints( + request.TimeSeries[0], + fms.timeSeriesByProject[projectName][test.timeSeriesIndexToCheck[i]], + ) { + t.Errorf( + "expected %+v and got %+v. Note: the points were not compared", + request.TimeSeries[0], + fms.timeSeriesByProject[projectName][test.timeSeriesIndexToCheck[i]], + ) + } + if !reflect.DeepEqual( + request.TimeSeries[0].Points[0], + fms.timeSeriesByProject[projectName][test.timeSeriesIndexToCheck[i]].Points[test.pointsIndexToCheck[i]], + ) { + t.Errorf( + "expected %+v and got %+v", + request.TimeSeries[0].Points[0], + fms.timeSeriesByProject[projectName][test.timeSeriesIndexToCheck[i]].Points[test.pointsIndexToCheck[i]], + ) + } + } + }) + } +} + +func TestCreateTimeSeriesTwoSeries(t *testing.T) { + fms := newFakeMetricServer() + projectName := "projects/1234" + + request := &monitoringpb.CreateTimeSeriesRequest{ + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }, + { + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe1", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }, + }, + } + response, err := fms.CreateTimeSeries(context.TODO(), request) + if err != nil || response == nil { + t.Errorf("unexpected error: %s", err) + } + for i := range request.TimeSeries { + if !reflect.DeepEqual(request.TimeSeries[i], fms.timeSeriesByProject[projectName][i]) { + t.Errorf("expected %+v and got %+v", request.TimeSeries[i], fms.timeSeriesByProject[projectName][i]) + } + } +} + +func TestListTimeSeriesBadInput(t *testing.T) { + fms := newFakeMetricServer() + projectName := "projects/1234" + filter := "metric.type = \"prometheus.googleapis.com/metric1/gauge\"" + + // these are the subtests + tests := []*listTimeSeriesTest{ + { + desc: "TestListTimeSeriesNilRequest", + request: nil, + }, + {}, + { + desc: "TestListTimeSeriesAggregation", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Aggregation: &monitoringpb.Aggregation{}, + Filter: filter, + }, + }, + { + desc: "TestListTimeSeriesNoInterval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + }, + }, + { + desc: "TestListTimeSeriesHeadersView", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + View: monitoringpb.ListTimeSeriesRequest_HEADERS, + }, + }, + { + desc: "TestListTimeSeriesMalformedFilter", + request: &monitoringpb.ListTimeSeriesRequest{ + + Name: projectName, + Filter: "metric.type = \"prometheus-target\" AND metric.labels.location = \"europe\"", + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + response, err := fms.ListTimeSeries(context.TODO(), test.request) + if err == nil || response != nil { + t.Errorf("expected an error for %q", test.desc) + } + }) + } +} + +func TestListTimeSeries(t *testing.T) { + fms := newFakeMetricServer() + projectName := "projects/1234" + filter := "metric.type = \"prometheus.googleapis.com/metric1/gauge\" AND project = \"example-project\"" + + point1 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + } + + point2 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 4}, + EndTime: ×tamppb.Timestamp{Seconds: 5}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + } + + resource1 := &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + } + + resource2 := &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job2", + "instance": "instance1", + }, + } + + metric := &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + } + + timeSeriesJob1 := &monitoringpb.TimeSeries{ + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + } + + timeSeriesJob2 := &monitoringpb.TimeSeries{ + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + } + + timeSeries := []*monitoringpb.TimeSeries{timeSeriesJob1, timeSeriesJob2} + createTimeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{ + Name: projectName, + TimeSeries: timeSeries, + } + if _, err := fms.CreateTimeSeries(context.TODO(), createTimeSeriesRequest); err != nil { + t.Fatalf("create time series: %s", err) + } + + timeSeriesJob1Point2 := &monitoringpb.TimeSeries{ + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2}, + } + + createTimeSeriesRequest.TimeSeries = []*monitoringpb.TimeSeries{timeSeriesJob1Point2} + if _, err := fms.CreateTimeSeries(context.TODO(), createTimeSeriesRequest); err != nil { + t.Fatalf("create time series: %s", err) + } + + testCases := []*listTimeSeriesTest{ + { + desc: "filter in range short interval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 3}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + { + desc: "filter in range wide interval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 6}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2, point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + { + desc: "interval out of range", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 10}, + EndTime: ×tamppb.Timestamp{Seconds: 11}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + // { + // Resource: resource1, + // Metric: metric, + // MetricKind: metricpb.MetricDescriptor_GAUGE, + // ValueType: metricpb.MetricDescriptor_DOUBLE, + // }, + // { + // Resource: resource2, + // Metric: metric, + // MetricKind: metricpb.MetricDescriptor_GAUGE, + // ValueType: metricpb.MetricDescriptor_DOUBLE, + // }, + }, + }, + }, + { + desc: "filter project", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: "project = \"example-project\"", + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 6}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2, point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + } + + ctx := context.Background() + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + response, err := fms.ListTimeSeries(ctx, tc.request) + if err != nil { + t.Errorf("unexpected error: %s", err) + } + if diff := cmp.Diff(tc.expected, response, protocmp.Transform()); diff != "" { + t.Errorf("expected response (-want, +got) %s", diff) + } + }) + } +} diff --git a/pkg/export/export.go b/pkg/export/export.go index a4d3fbd6b6..6f91a90ea2 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -29,7 +29,7 @@ import ( "time" monitoring "cloud.google.com/go/monitoring/apiv3/v2" - monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/go-kit/log/level" gax "github.com/googleapis/gax-go/v2" @@ -511,7 +511,7 @@ func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemp e.triggerNext() } -func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool { +func sampleInRange(sample *monitoringpb.TimeSeries, start, end time.Time) bool { // A sample has exactly one point in the time series. The start timestamp may be unset for gauges. if s := sample.Points[0].Interval.StartTime; s != nil && s.AsTime().Before(start) { return false @@ -522,7 +522,7 @@ func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool return true } -func (e *Exporter) enqueue(hash uint64, sample *monitoring_pb.TimeSeries) { +func (e *Exporter) enqueue(hash uint64, sample *monitoringpb.TimeSeries) { idx := hash % uint64(len(e.shards)) e.shards[idx].enqueue(hash, sample) } @@ -863,7 +863,7 @@ type batch struct { logger log.Logger maxSize uint - m map[string][]*monitoring_pb.TimeSeries + m map[string][]*monitoringpb.TimeSeries shards []*shard oneFull bool total int @@ -876,7 +876,7 @@ func newBatch(logger log.Logger, shardsCount uint, maxSize uint) *batch { return &batch{ logger: logger, maxSize: maxSize, - m: make(map[string][]*monitoring_pb.TimeSeries, 1), + m: make(map[string][]*monitoringpb.TimeSeries, 1), shards: make([]*shard, 0, shardsCount/2), } } @@ -886,12 +886,12 @@ func (b *batch) addShard(s *shard) { } // add a new sample to the batch. Must only be called after full() returned false. -func (b *batch) add(s *monitoring_pb.TimeSeries) { +func (b *batch) add(s *monitoringpb.TimeSeries) { pid := s.Resource.Labels[KeyProjectID] l, ok := b.m[pid] if !ok { - l = make([]*monitoring_pb.TimeSeries, 0, b.maxSize) + l = make([]*monitoringpb.TimeSeries, 0, b.maxSize) } l = append(l, s) b.m[pid] = l @@ -923,7 +923,7 @@ func (b *batch) empty() bool { // requests have completed and notifies the pending shards. func (b batch) send( ctx context.Context, - sendOne func(context.Context, *monitoring_pb.CreateTimeSeriesRequest, ...gax.CallOption) error, + sendOne func(context.Context, *monitoringpb.CreateTimeSeriesRequest, ...gax.CallOption) error, ) { // Set timeout so slow requests in the batch do not block overall progress indefinitely. sendCtx, cancel := context.WithTimeout(ctx, 30*time.Second) @@ -935,7 +935,7 @@ func (b batch) send( for pid, l := range b.m { wg.Add(1) - go func(pid string, l []*monitoring_pb.TimeSeries) { + go func(pid string, l []*monitoringpb.TimeSeries) { defer wg.Done() pendingRequests.Inc() @@ -945,7 +945,7 @@ func (b batch) send( // We do not retry any requests due to the risk of producing a backlog // that cannot be worked down, especially if large amounts of clients try to do so. - err := sendOne(sendCtx, &monitoring_pb.CreateTimeSeriesRequest{ + err := sendOne(sendCtx, &monitoringpb.CreateTimeSeriesRequest{ Name: fmt.Sprintf("projects/%s", pid), TimeSeries: l, })