diff --git a/Makefile b/Makefile index f841140b30..29eed63d88 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ endif ifeq ($(KIND_PERSIST), 1) E2E_DOCKER_ARGS += --env KIND_PERSIST=1 endif -E2E_DEPS:=config-reloader operator rule-evaluator go-synthetic +E2E_DEPS:=config-reloader operator rule-evaluator go-synthetic fake-metric-service REGISTRY_NAME=kind-registry REGISTRY_PORT=5001 KIND_PARALLEL?=5 diff --git a/cmd/fake-metric-service/Dockerfile b/cmd/fake-metric-service/Dockerfile new file mode 100644 index 0000000000..169ee9fd0f --- /dev/null +++ b/cmd/fake-metric-service/Dockerfile @@ -0,0 +1,29 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM google-go.pkg.dev/golang:1.22.3@sha256:efc6b824652199ede8bc25e2d5a57076e0e1ffbe90735dcbda3ee956fe33b612 as buildbase +WORKDIR /app +COPY go.mod go.mod +COPY go.sum go.sum +COPY cmd cmd +COPY pkg pkg +COPY vendor vendor + +FROM buildbase as appbase +RUN CGO_ENABLED=1 GOEXPERIMENT=boringcrypto \ + go build -tags boring -mod=vendor -o fake-metric-service cmd/fake-metric-service/*.go + +FROM gke.gcr.io/gke-distroless/libc:gke_distroless_20240307.00_p0@sha256:4f834e207f2721977094aeec4c9daee7032c5daec2083c0be97760f4306e4f88 +COPY --from=appbase /app/fake-metric-service /bin/fake-metric-service +ENTRYPOINT ["/bin/fake-metric-service"] diff --git a/cmd/fake-metric-service/main.go b/cmd/fake-metric-service/main.go new file mode 100644 index 0000000000..f11f241f6b --- /dev/null +++ b/cmd/fake-metric-service/main.go @@ -0,0 +1,144 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "flag" + "net" + "net/http" + "os" + "sync" + "time" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/GoogleCloudPlatform/prometheus-engine/pkg/e2e" + "github.com/GoogleCloudPlatform/prometheus-engine/pkg/export" + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/proto" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log/zap" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" +) + +func main() { + logVerbosity := 0 + addr := ":8080" + metricServiceAddr := ":8081" + + flag.IntVar(&logVerbosity, "v", logVerbosity, "Logging verbosity") + flag.StringVar(&addr, "addr", addr, "Address to serve probe statuses (e.g. /readyz and /livez) and /metrics") + flag.StringVar(&metricServiceAddr, "metric-service-addr", metricServiceAddr, "Address to serve a mock metric service server.") + flag.Parse() + + logger := zap.New(zap.Level(zapcore.Level(-logVerbosity))) + ctrl.SetLogger(logger) + + ctx := signals.SetupSignalHandler() + if err := run(ctx, logger, addr, metricServiceAddr); err != nil { + logger.Error(err, "exit with error") + os.Exit(1) + } +} + +func run(ctx context.Context, logger logr.Logger, probeAddr, metricServiceAddr string) error { + listener, err := net.Listen("tcp", metricServiceAddr) + if err != nil { + return err + } + + wg := sync.WaitGroup{} + errs := make(chan error, 1) + + logger.Info("starting server...") + metricDatabase := e2e.NewMetricDatabase() + + { + server := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + now := time.Now() + logger.Info("grpc request", "method", info.FullMethod, "time", now, "data", prototext.Format(req.(proto.Message))) + resp, err := handler(ctx, req) + logger.Info("grpc response", "method", info.FullMethod, "time", now, "duration", time.Since(now)) + if err != nil { + logger.Error(err, "grpc failure", "method", info.FullMethod, "time", now) + } + return resp, err + })) + monitoringpb.RegisterMetricServiceServer(server, e2e.NewFakeMetricServer(metricDatabase)) + + wg.Add(1) + go func() { + for range ctx.Done() { + server.GracefulStop() + return + } + }() + go func() { + defer wg.Done() + if err := server.Serve(listener); err != nil { + errs <- err + } + }() + } + + { + registry := prometheus.NewRegistry() + registry.MustRegister(e2e.NewMetricCollector(logger, export.MetricTypePrefix, metricDatabase)) + + wg.Add(1) + mux := http.NewServeMux() + mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + mux.Handle("/livez", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + })) + mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{Registry: registry})) + + server := http.Server{ + Addr: probeAddr, + Handler: mux, + } + go func() { + for range ctx.Done() { + // Start new context because ours is done. + if err := server.Shutdown(context.Background()); err != nil { + errs <- err + } + } + }() + go func() { + defer wg.Done() + if err := server.ListenAndServe(); err != nil { + errs <- err + } + }() + } + + go func() { + wg.Wait() + close(errs) + }() + + for err := range errs { + return err + } + return nil +} diff --git a/e2e/collector_test.go b/e2e/collector_test.go index ae4b27b8ef..01ba46b95e 100644 --- a/e2e/collector_test.go +++ b/e2e/collector_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - gcm "cloud.google.com/go/monitoring/apiv3/v2" gcmpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/GoogleCloudPlatform/prometheus-engine/e2e/kube" monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1" @@ -79,9 +78,7 @@ func TestCollectorPodMonitoring(t *testing.T) { }, } t.Run("self-podmonitoring-ready", testEnsurePodMonitoringReady(ctx, kubeClient, pm)) - if !skipGCM { - t.Run("self-podmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-podmon")) - } + t.Run("self-podmonitoring-gcm", testValidateCollectorUpMetrics(ctx, restConfig, kubeClient, "collector-podmon")) } func TestCollectorClusterPodMonitoring(t *testing.T) { @@ -120,9 +117,7 @@ func TestCollectorClusterPodMonitoring(t *testing.T) { }, } t.Run("self-clusterpodmonitoring-ready", testEnsureClusterPodMonitoringReady(ctx, kubeClient, cpm)) - if !skipGCM { - t.Run("self-clusterpodmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-cmon")) - } + t.Run("self-clusterpodmonitoring-gcm", testValidateCollectorUpMetrics(ctx, restConfig, kubeClient, "collector-cmon")) } func TestCollectorKubeletScraping(t *testing.T) { @@ -138,9 +133,7 @@ func TestCollectorKubeletScraping(t *testing.T) { t.Run("collector-operatorconfig", testCollectorOperatorConfig(ctx, kubeClient)) t.Run("enable-kubelet-scraping", testEnableKubeletScraping(ctx, kubeClient)) - if !skipGCM { - t.Run("scrape-kubelet", testCollectorScrapeKubelet(ctx, kubeClient)) - } + t.Run("scrape-kubelet", testCollectorScrapeKubelet(ctx, restConfig, kubeClient)) } // testCollectorDeployed does a high-level verification on whether the @@ -440,12 +433,12 @@ func testEnableKubeletScraping(ctx context.Context, kubeClient client.Client) fu // testValidateCollectorUpMetrics checks whether the scrape-time up metrics for all collector // pods can be queried from GCM. -func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Client, job string) func(*testing.T) { +func testValidateCollectorUpMetrics(ctx context.Context, restConfig *rest.Config, kubeClient client.Client, job string) func(*testing.T) { return func(t *testing.T) { t.Log("checking for metrics in Cloud Monitoring") // Wait for metric data to show up in Cloud Monitoring. - metricClient, err := gcm.NewMetricClient(ctx) + metricClient, err := newMetricClient(ctx, t, restConfig, kubeClient) if err != nil { t.Fatalf("create metric client: %s", err) } @@ -480,7 +473,7 @@ func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Clien Filter: fmt.Sprintf(` resource.type = "prometheus_target" AND resource.labels.project_id = "%s" AND - resource.label.location = "%s" AND + resource.labels.location = "%s" AND resource.labels.cluster = "%s" AND resource.labels.namespace = "%s" AND resource.labels.job = "%s" AND @@ -519,12 +512,12 @@ func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Clien } // testCollectorScrapeKubelet verifies that kubelet metric endpoints are successfully scraped. -func testCollectorScrapeKubelet(ctx context.Context, kubeClient client.Client) func(*testing.T) { +func testCollectorScrapeKubelet(ctx context.Context, restConfig *rest.Config, kubeClient client.Client) func(*testing.T) { return func(t *testing.T) { t.Log("checking for metrics in Cloud Monitoring") // Wait for metric data to show up in Cloud Monitoring. - metricClient, err := gcm.NewMetricClient(ctx) + metricClient, err := newMetricClient(ctx, t, restConfig, kubeClient) if err != nil { t.Fatalf("create GCM metric client: %s", err) } @@ -548,7 +541,7 @@ func testCollectorScrapeKubelet(ctx context.Context, kubeClient client.Client) f Filter: fmt.Sprintf(` resource.type = "prometheus_target" AND resource.labels.project_id = "%s" AND - resource.label.location = "%s" AND + resource.labels.location = "%s" AND resource.labels.cluster = "%s" AND resource.labels.job = "kubelet" AND resource.labels.instance = "%s:%s" AND diff --git a/e2e/deploy/deploy.go b/e2e/deploy/deploy.go index c3c0254208..9e8fdbd6e5 100644 --- a/e2e/deploy/deploy.go +++ b/e2e/deploy/deploy.go @@ -49,12 +49,14 @@ func CreateResources(ctx context.Context, kubeClient client.Client, deployOpts . type DeployOption func(*deployOptions) type deployOptions struct { - operatorNamespace string - publicNamespace string - projectID string - cluster string - location string - disableGCM bool + operatorNamespace string + publicNamespace string + projectID string + cluster string + location string + disableGCM bool + gcmEndpoint string + prometheusEndpoint string } func (opts *deployOptions) setDefaults() { @@ -92,6 +94,18 @@ func WithDisableGCM(disableGCM bool) DeployOption { } } +func WithGCMEndpoint(gcmEndpoint string) DeployOption { + return func(opts *deployOptions) { + opts.gcmEndpoint = gcmEndpoint + } +} + +func WithPrometheusEndpoint(prometheusEndpoint string) DeployOption { + return func(opts *deployOptions) { + opts.prometheusEndpoint = prometheusEndpoint + } +} + func createResources(ctx context.Context, kubeClient client.Client, normalizeFn func(client.Object) (client.Object, error)) error { resources, err := resources(kubeClient.Scheme()) if err != nil { @@ -130,16 +144,18 @@ func resources(scheme *runtime.Scheme) ([]client.Object, error) { } func normalizeDeamonSets(opts *deployOptions, obj *appsv1.DaemonSet) (client.Object, error) { - if !opts.disableGCM { - return obj, nil - } if obj.GetName() != operator.NameCollector { return obj, nil } for i := range obj.Spec.Template.Spec.Containers { container := &obj.Spec.Template.Spec.Containers[i] if container.Name == operator.CollectorPrometheusContainerName { - container.Args = append(container.Args, "--export.debug.disable-auth") + if opts.disableGCM || opts.gcmEndpoint != "" { + container.Args = append(container.Args, "--export.debug.disable-auth") + } + if opts.gcmEndpoint != "" { + container.Args = append(container.Args, fmt.Sprintf("--export.endpoint=%s", opts.gcmEndpoint)) + } return obj, nil } } @@ -169,15 +185,23 @@ func normalizeDeployments(opts *deployOptions, obj *appsv1.Deployment) (client.O container.Args = append(container.Args, fmt.Sprintf("--public-namespace=%s", opts.publicNamespace)) } case operator.NameRuleEvaluator: - if !opts.disableGCM { - break - } container, err := kube.DeploymentContainer(obj, operator.RuleEvaluatorContainerName) if err != nil { return nil, fmt.Errorf("unable to find rule-evaluator %q container: %w", operator.RuleEvaluatorContainerName, err) } - container.Args = append(container.Args, "--export.debug.disable-auth") - container.Args = append(container.Args, "--query.debug.disable-auth") + if opts.disableGCM || opts.gcmEndpoint != "" { + container.Args = append(container.Args, "--export.debug.disable-auth") + } + if opts.gcmEndpoint != "" { + container.Args = append(container.Args, fmt.Sprintf("--export.endpoint=%s", opts.gcmEndpoint)) + } + + if opts.disableGCM || opts.prometheusEndpoint != "" { + container.Args = append(container.Args, "--query.debug.disable-auth") + } + if opts.prometheusEndpoint != "" { + container.Args = append(container.Args, fmt.Sprintf("--query.target-url=%s", opts.prometheusEndpoint)) + } } return obj, nil } diff --git a/e2e/deploy/metric_service.go b/e2e/deploy/metric_service.go new file mode 100644 index 0000000000..2854630993 --- /dev/null +++ b/e2e/deploy/metric_service.go @@ -0,0 +1,264 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package deploy + +import ( + "context" + "fmt" + + "github.com/GoogleCloudPlatform/prometheus-engine/e2e/kube" + "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func CreateFakeMetricService(ctx context.Context, kubeClient client.Client, namespace, name, image string) error { + labels := map[string]string{ + "app.kubernetes.io/name": "fake-metric-service", + } + if err := kubeClient.Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "server", + Image: image, + Ports: []corev1.ContainerPort{ + { + Name: "web", + ContainerPort: 8080, + }, + { + Name: "metric-service", + ContainerPort: 8081, + }, + }, + Env: []corev1.EnvVar{ + { + Name: "GRPC_GO_LOG_VERBOSITY_LEVEL", + Value: "99", + }, + { + Name: "GRPC_GO_LOG_SEVERITY_LEVEL", + Value: "info", + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "livez", + Port: intstr.FromInt(8080), + Scheme: "HTTP", + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "readyz", + Port: intstr.FromInt(8080), + Scheme: "HTTP", + }, + }, + }, + }, + }, + }, + }, + }, + }); err != nil { + return fmt.Errorf("create metric-service deployment: %w", err) + } + if err := kubeClient.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-grpc", + Namespace: namespace, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Port: 8081, + TargetPort: intstr.FromString("metric-service"), + }, + }, + }, + }); err != nil { + return fmt.Errorf("create metric-service service: %w", err) + } + if err := kubeClient.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name + "-web", + Namespace: namespace, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Port: 8080, + TargetPort: intstr.FromString("web"), + }, + }, + }, + }); err != nil { + return fmt.Errorf("create metric-service service: %w", err) + } + return kube.WaitForDeploymentReady(ctx, kubeClient, namespace, name) +} + +func FakeMetricServiceEndpoint(namespace, name string) string { + return fmt.Sprintf("%s-grpc.%s.svc.cluster.local:8081", name, namespace) +} + +func FakeMetricServiceWebEndpoint(namespace, name string) string { + return fmt.Sprintf("%s-web.%s.svc.cluster.local:8080", name, namespace) +} + +func CreateFakeMetricCollector(ctx context.Context, kubeClient client.Client, namespace, name, metricServiceEndpoint string) error { + labels := map[string]string{ + "app.kubernetes.io/name": "fake-metric-collector", + } + if err := kubeClient.Create(ctx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: map[string]string{ + // Use a static config because we don't want Kubernetes meta-labels. + "config.yaml": fmt.Sprintf(` +global: + scrape_interval: 5s +scrape_configs: + - job_name: metric-collector + static_configs: + - targets: ["%s"] +`, metricServiceEndpoint), + }, + }); err != nil { + return err + } + if err := kubeClient.Create(ctx, &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + ServiceAccountName: operator.NameCollector, + Containers: []corev1.Container{ + { + Name: "prometheus", + Image: "prom/prometheus:v2.52.0", + Args: []string{ + "--config.file=/config/config.yaml", + "--web.listen-address=:8080", + "--web.enable-lifecycle", + "--web.route-prefix=/", + }, + Ports: []corev1.ContainerPort{ + { + Name: "api", + ContainerPort: 8080, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/-/healthy", + Port: intstr.FromInt(8080), + Scheme: "HTTP", + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/-/ready", + Port: intstr.FromInt(8080), + Scheme: "HTTP", + }, + }, + }, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "config", + ReadOnly: true, + MountPath: "/config", + }, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "config", + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: name, + }, + }, + }, + }, + }, + }, + }, + }, + }); err != nil { + return fmt.Errorf("create prometheus deployment: %w", err) + } + if err := kubeClient.Create(ctx, &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Spec: corev1.ServiceSpec{ + Selector: labels, + Ports: []corev1.ServicePort{ + { + Port: 8080, + TargetPort: intstr.FromString("api"), + }, + }, + }, + }); err != nil { + return fmt.Errorf("create metric-service service: %w", err) + } + return kube.WaitForDeploymentReady(ctx, kubeClient, namespace, name) +} + +func FakeMetricCollectorEndpoint(namespace, name string) string { + return fmt.Sprintf("%s.%s.svc.cluster.local:8080", name, namespace) +} diff --git a/e2e/kube/network.go b/e2e/kube/network.go index 388451c321..8ddec3a1ed 100644 --- a/e2e/kube/network.go +++ b/e2e/kube/network.go @@ -20,11 +20,19 @@ package kube import ( "context" + "errors" "fmt" "io" "net" "net/http" + "net/url" + "strconv" + "strings" + "sync" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/rest" "k8s.io/client-go/tools/portforward" "k8s.io/client-go/transport/spdy" @@ -147,3 +155,158 @@ func PortForwardClient(restConfig *rest.Config, kubeClient client.Client, out, e }, }, nil } + +func PortForward(ctx context.Context, restConfig *rest.Config, kubeClient client.Client, address string, out, errOut io.Writer) (net.Conn, error) { + restClient, err := rest.RESTClientFor(restConfig) + if err != nil { + return nil, fmt.Errorf("unable to create REST client: %w", err) + } + return portForward(ctx, restConfig, restClient, kubeClient, address, out, errOut) +} + +func portForward(ctx context.Context, restConfig *rest.Config, restClient *rest.RESTClient, kubeClient client.Client, address string, out, errOut io.Writer) (net.Conn, error) { + // Cannot parse a URL without a scheme. + if !strings.HasPrefix(address, "http") { + address = "http://" + address + } + url, err := url.Parse(address) + if err != nil { + return nil, fmt.Errorf("unable to parse address %q: %s", address, err) + } + ip := net.ParseIP(url.Host) + urlPort := url.Port() + if urlPort == "" { + return nil, errors.New("unknown port") + } + port, err := strconv.Atoi(urlPort) + if err != nil { + return nil, fmt.Errorf("invalid port: %s", address) + } + + var pod *corev1.Pod + var containerName string + if ip != nil { + pod, containerName, err = podByAddr(ctx, kubeClient, &net.TCPAddr{ + IP: ip, + Port: port, + }) + if err != nil { + return nil, fmt.Errorf("unable to get pod from IP %s: %w", url.Host, err) + } + } else { + split := strings.SplitN(url.Hostname(), ".", 3) + if len(split) != 3 || split[2] != "svc.cluster.local" { + return nil, fmt.Errorf("invalid address format: %s", url.Host) + } + + service := corev1.Service{ + ObjectMeta: v1.ObjectMeta{ + Name: split[0], + Namespace: split[1], + }, + } + if err := kubeClient.Get(ctx, client.ObjectKeyFromObject(&service), &service); err != nil { + return nil, fmt.Errorf("unable to get service %q: %s", client.ObjectKeyFromObject(&service), err) + } + + pods, err := podsFromSelector(ctx, kubeClient, v1.SetAsLabelSelector(labels.Set(service.Spec.Selector))) + if err != nil { + return nil, fmt.Errorf("unable to get pods from service %q selector: %s", client.ObjectKeyFromObject(&service), err) + } + + if len(pods) == 0 { + return nil, fmt.Errorf("found not pods from service %q selector: %s", client.ObjectKeyFromObject(&service), err) + } + + for i := range pods { + if containerIndex := podPortContainerIndex(kubeClient, &pods[i], port); containerIndex != -1 { + pod = &pods[i] + containerName = pods[i].Spec.Containers[containerIndex].Name + break + } + } + if containerName == "" { + return nil, fmt.Errorf("unable to find container for port %d", port) + } + } + + if err := waitForPodContainerReady(ctx, kubeClient, pod, containerName); err != nil { + return nil, fmt.Errorf("failed waiting for pod from IP %s: %w", ip, err) + } + resourceURL := restClient. + Post(). + Resource("pods"). + Namespace(pod.GetNamespace()). + Name(pod.GetName()). + SubResource("portforward"). + URL() + + transport, upgrader, err := spdy.RoundTripperFor(restConfig) + if err != nil { + return nil, err + } + client := &http.Client{ + Transport: transport, + } + + stopCh := make(chan struct{}) + readyCh := make(chan struct{}) + errCh := make(chan error) + forwardDialer := spdy.NewDialer(upgrader, client, http.MethodPost, resourceURL) + forwarder, err := portforward.NewOnAddresses( + forwardDialer, + // Specify IPv4 address explicitly, since GitHub Actions does not support IPv6. + []string{"127.0.0.1"}, + // The leading colon indicates that a random port is chosen. + []string{fmt.Sprintf(":%d", port)}, + stopCh, + readyCh, + out, + errOut, + ) + if err != nil { + return nil, err + } + + go func() { + if err := forwarder.ForwardPorts(); err != nil { + errCh <- err + } + close(errCh) + }() + + // Protect against multiple closes. + closeForwarder := sync.OnceFunc(func() { + // readyCh is closed by the port-forwarder. + close(stopCh) + }) + + select { + case <-readyCh: + ports, err := forwarder.GetPorts() + if err != nil { + return nil, err + } + if len(ports) != 1 { + return nil, fmt.Errorf("expected 1 port but found %d", len(ports)) + } + port := ports[0] + + // Pass in tcp4 to ensure we always get IPv4 and never IPv6. + var dialer net.Dialer + conn, err := dialer.DialContext(ctx, "tcp4", fmt.Sprintf("127.0.0.1:%d", port.Local)) + if err != nil { + return nil, err + } + return &wrappedConn{ + Conn: conn, + closeFn: closeForwarder, + }, nil + case <-stopCh: + closeForwarder() + return nil, fmt.Errorf("port forwarding stopped unexpectedly") + case err := <-errCh: + closeForwarder() + return nil, fmt.Errorf("port forwarding failed: %w", err) + } +} diff --git a/e2e/kube/pod.go b/e2e/kube/pod.go index 996bf3e84c..134a1219f6 100644 --- a/e2e/kube/pod.go +++ b/e2e/kube/pod.go @@ -120,12 +120,9 @@ func podByAddr(ctx context.Context, kubeClient client.Client, addr *net.TCPAddr) if err != nil { return nil, "", err } - for _, container := range pod.Spec.Containers { - for _, port := range container.Ports { - if int(port.ContainerPort) == addr.Port { - return pod, container.Name, nil - } - } + containerIndex := podPortContainerIndex(kubeClient, pod, addr.Port) + if containerIndex != -1 { + return pod, pod.Spec.Containers[containerIndex].Name, nil } key := client.ObjectKeyFromObject(pod) return nil, "", fmt.Errorf("unable to find port %d in pod %s", addr.Port, key) @@ -144,3 +141,14 @@ func podsFromSelector(ctx context.Context, kubeClient client.Client, ps *metav1. } return podList.Items, nil } + +func podPortContainerIndex(_ client.Client, pod *corev1.Pod, port int) int { + for i, container := range pod.Spec.Containers { + for _, p := range container.Ports { + if int(p.ContainerPort) == port { + return i + } + } + } + return -1 +} diff --git a/e2e/main_test.go b/e2e/main_test.go index 915156f3de..c699887e4b 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -22,11 +22,17 @@ import ( "context" "flag" "fmt" + "net" + "net/http" "os" "testing" "time" + gcm "cloud.google.com/go/monitoring/apiv3/v2" "go.uber.org/zap/zapcore" + "google.golang.org/api/option" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" metav1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" @@ -51,9 +57,12 @@ const ( var ( projectID, location, cluster string - skipGCM bool + fakeGCM bool pollDuration time.Duration + imageTag string + imageRegistryPort int + gcpServiceAccount string ) @@ -64,18 +73,20 @@ func TestMain(m *testing.M) { flag.StringVar(&projectID, "project-id", "", "The GCP project to write metrics to.") flag.StringVar(&location, "location", "", "The location of the Kubernetes cluster that's tested against.") flag.StringVar(&cluster, "cluster", "", "The name of the Kubernetes cluster that's tested against.") - flag.BoolVar(&skipGCM, "skip-gcm", false, "Skip validating GCM ingested points.") + flag.BoolVar(&fakeGCM, "fake-metric-service", true, "Use a fake GCM endpoint.") flag.DurationVar(&pollDuration, "duration", 3*time.Second, "How often to poll and retry for resources.") flag.StringVar(&gcpServiceAccount, "gcp-service-account", "", "Path to GCP service account file for usage by deployed containers.") + flag.StringVar(&imageTag, "image-tag", "", "The tag to use from the local registry.") + flag.IntVar(&imageRegistryPort, "registry-port", -1, "The port of the local registry.") + flag.Parse() os.Exit(m.Run()) } func setupCluster(ctx context.Context, t testing.TB) (client.Client, *rest.Config, error) { - t.Log(">>> deploying static resources") restConfig, err := newRestConfig() if err != nil { return nil, nil, err @@ -86,7 +97,8 @@ func setupCluster(ctx context.Context, t testing.TB) (client.Client, *rest.Confi return nil, nil, err } - if err := createResources(ctx, kubeClient); err != nil { + t.Log(">>> deploying static resources") + if err := createResources(ctx, kubeClient, imageTag, imageRegistryPort); err != nil { return nil, nil, err } @@ -153,10 +165,30 @@ func newScheme() (*runtime.Scheme, error) { return scheme, nil } -func createResources(ctx context.Context, kubeClient client.Client) error { - if err := deploy.CreateResources(ctx, kubeClient, deploy.WithMeta(projectID, cluster, location), deploy.WithDisableGCM(skipGCM)); err != nil { +func createResources(ctx context.Context, kubeClient client.Client, imageTag string, imageRegistryPort int) error { + deployOpts := []deploy.DeployOption{ + deploy.WithMeta(projectID, cluster, location), + } + if fakeGCM { + if imageRegistryPort == -1 { + return fmt.Errorf("--registry-port must be provided with --fake-metric-service") + } + metricServiceImage := fmt.Sprintf("localhost:%d/fake-metric-service:%s", imageRegistryPort, imageTag) + if err := deploy.CreateFakeMetricService(ctx, kubeClient, metav1.NamespaceDefault, "metric-service", metricServiceImage); err != nil { + return err + } + deployOpts = append(deployOpts, deploy.WithGCMEndpoint(deploy.FakeMetricServiceEndpoint(metav1.NamespaceDefault, "metric-service"))) + deployOpts = append(deployOpts, deploy.WithPrometheusEndpoint("http://"+deploy.FakeMetricCollectorEndpoint(operator.DefaultOperatorNamespace, "metric-collector"))) + } + if err := deploy.CreateResources(ctx, kubeClient, deployOpts...); err != nil { return err } + if fakeGCM { + // Create once the GMP system namespace exists. + if err := deploy.CreateFakeMetricCollector(ctx, kubeClient, operator.DefaultOperatorNamespace, "metric-collector", deploy.FakeMetricServiceWebEndpoint(metav1.NamespaceDefault, "metric-service")); err != nil { + return err + } + } if gcpServiceAccount == "" { gcpServiceAccount, _ = os.LookupEnv("GOOGLE_APPLICATION_CREDENTIALS") @@ -166,7 +198,7 @@ func createResources(ctx context.Context, kubeClient client.Client) error { if err != nil { return fmt.Errorf("read service account file %q: %w", gcpServiceAccount, err) } - if err := deploy.CreateGCPSecretResources(context.Background(), kubeClient, metav1.NamespaceDefault, b); err != nil { + if err := deploy.CreateGCPSecretResources(ctx, kubeClient, metav1.NamespaceDefault, b); err != nil { return err } } @@ -187,3 +219,48 @@ func contextWithDeadline(t *testing.T) context.Context { t.Cleanup(cancel) return ctx } + +func newMetricClient(ctx context.Context, t *testing.T, restConfig *rest.Config, kubeClient client.Client) (*gcm.MetricClient, error) { + clientOpts := []option.ClientOption{ + option.WithUserAgent("prometheus-engine-e2e"), + } + if fakeGCM { + clientOpts = append(clientOpts, + option.WithoutAuthentication(), + option.WithGRPCDialOption(grpc.WithTransportCredentials(insecure.NewCredentials())), + option.WithEndpoint(deploy.FakeMetricServiceEndpoint(metav1.NamespaceDefault, "metric-service")), + option.WithGRPCDialOption(grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { + t.Log("Forwarding address:", address) + conn, err := kube.PortForward(ctx, restConfig, kubeClient, address, + writerFn(func(p []byte) (n int, err error) { + t.Logf("portforward: info: %s", string(p)) + return len(p), nil + }), + writerFn(func(p []byte) (n int, err error) { + t.Logf("portforward: error: %s", string(p)) + return len(p), nil + })) + if err != nil { + t.Error("unable to port-forward:", err) + } + return conn, err + })), + ) + } + return gcm.NewMetricClient(ctx, clientOpts...) +} + +func newPortForwardClient(t *testing.T, restConfig *rest.Config, kubeClient client.Client) (*http.Client, error) { + return kube.PortForwardClient( + restConfig, + kubeClient, + writerFn(func(p []byte) (n int, err error) { + t.Logf("portforward: info: %s", string(p)) + return len(p), nil + }), + writerFn(func(p []byte) (n int, err error) { + t.Logf("portforward: error: %s", string(p)) + return len(p), nil + }), + ) +} diff --git a/e2e/ruler_test.go b/e2e/ruler_test.go index a559c38f9f..aba0efeda7 100644 --- a/e2e/ruler_test.go +++ b/e2e/ruler_test.go @@ -27,7 +27,6 @@ import ( "testing" "time" - gcm "cloud.google.com/go/monitoring/apiv3/v2" gcmpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator" monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1" @@ -85,9 +84,7 @@ func testRuleEvaluator(t *testing.T, features monitoringv1.OperatorFeatures) { t.Run("rule-evaluator-configuration", testRuleEvaluatorConfiguration(ctx, kubeClient)) t.Run("rules-create", testCreateRules(ctx, restConfig, kubeClient, operator.DefaultOperatorNamespace, metav1.NamespaceDefault, features)) - if !skipGCM { - t.Run("rules-gcm", testValidateRuleEvaluationMetrics(ctx)) - } + t.Run("rules-gcm", testValidateRuleEvaluationMetrics(ctx, restConfig, kubeClient)) } func testRuleEvaluatorDeployed(ctx context.Context, kubeClient client.Client) func(*testing.T) { @@ -540,18 +537,7 @@ func testCreateRules( t.Fatalf("failed waiting for generated rules: %s", err) } - httpClient, err := kube.PortForwardClient( - restConfig, - kubeClient, - writerFn(func(p []byte) (n int, err error) { - t.Logf("portforward: info: %s", string(p)) - return len(p), nil - }), - writerFn(func(p []byte) (n int, err error) { - t.Logf("portforward: error: %s", string(p)) - return len(p), nil - }), - ) + httpClient, err := newPortForwardClient(t, restConfig, kubeClient) if err != nil { t.Fatalf("failed to create port forward client: %s", err) } @@ -601,18 +587,19 @@ func testCreateRules( } } -func testValidateRuleEvaluationMetrics(ctx context.Context) func(*testing.T) { +func testValidateRuleEvaluationMetrics(ctx context.Context, restConfig *rest.Config, kubeClient client.Client) func(*testing.T) { return func(t *testing.T) { t.Log("checking for metrics in Cloud Monitoring") // Wait for metric data to show up in Cloud Monitoring. - metricClient, err := gcm.NewMetricClient(ctx) + metricClient, err := newMetricClient(ctx, t, restConfig, kubeClient) if err != nil { t.Fatalf("create metric client: %s", err) } defer metricClient.Close() - err = wait.PollUntilContextTimeout(ctx, 3*time.Second, 3*time.Minute, true, func(ctx context.Context) (bool, error) { + err = errors.New("timed out querying time series") + pollErr := wait.PollUntilContextTimeout(ctx, 3*time.Second, 3*time.Minute, true, func(ctx context.Context) (bool, error) { now := time.Now() // Validate the majority of labels being set correctly by filtering along them. @@ -633,25 +620,32 @@ func testValidateRuleEvaluationMetrics(ctx context.Context) func(*testing.T) { StartTime: timestamppb.New(now.Add(-10 * time.Second)), }, }) - series, err := iter.Next() + var series *gcmpb.TimeSeries + series, err = iter.Next() if err == iterator.Done { + err = errors.New("no data in GCM") t.Logf("no data in GCM, retrying...") return false, nil } else if err != nil { return false, fmt.Errorf("querying metrics failed: %w", err) } if len(series.Points) == 0 { - return false, errors.New("unexpected zero points in result series") + err = errors.New("unexpected zero points in result series") + return false, err } // We expect exactly one result. series, err = iter.Next() if err != iterator.Done { - return false, fmt.Errorf("expected iterator to be done but series %v: %w", series, err) + err = fmt.Errorf("expected iterator to be done but series %v: %w", series, err) + return false, err } return true, nil }) - if err != nil { - t.Fatalf("waiting for rule metrics to appear in GCM failed: %s", err) + if pollErr != nil { + if wait.Interrupted(pollErr) && err != nil { + pollErr = err + } + t.Fatalf("waiting for rule metrics to appear in GCM failed: %s", pollErr) } } } diff --git a/go.mod b/go.mod index 6163d00842..38bec5ab70 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/stretchr/testify v1.8.4 github.com/thanos-io/thanos v0.34.2-0.20240314081355-f731719f9515 go.uber.org/zap v1.26.0 + golang.org/x/exp v0.0.0-20240119083558-1b970713d09a golang.org/x/mod v0.15.0 golang.org/x/oauth2 v0.17.0 golang.org/x/time v0.5.0 @@ -126,7 +127,6 @@ require ( go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.21.0 // indirect - golang.org/x/exp v0.0.0-20240119083558-1b970713d09a // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/hack/kind-test.sh b/hack/kind-test.sh index 6da3eb81b3..4bf2ba0c45 100755 --- a/hack/kind-test.sh +++ b/hack/kind-test.sh @@ -23,7 +23,7 @@ GO_TEST=$1 REPO_ROOT=$(dirname "${BASH_SOURCE[0]}")/.. TAG_NAME=$(date "+gmp-%Y%d%m_%H%M") -TEST_ARGS="" +TEST_ARGS="-image-tag=${TAG_NAME} -registry-port=${REGISTRY_PORT}" # Convert kind cluster name to required regex if necessary. # We need to ensure this name is not too long due to: https://github.com/kubernetes-sigs/kind/issues/623 # while still unique enough to avoid dups between similar test names when trimming. @@ -36,9 +36,6 @@ KUBECTL="kubectl --context kind-${KIND_CLUSTER}" GMP_CLUSTER=$TAG_NAME if [[ -n "${GOOGLE_APPLICATION_CREDENTIALS:-}" ]]; then PROJECT_ID=$(jq -r '.project_id' "${GOOGLE_APPLICATION_CREDENTIALS}") -else - echo ">>> no credentials specified. running without GCM validation" - TEST_ARGS="${TEST_ARGS} -skip-gcm" fi TEST_ARGS="${TEST_ARGS} -project-id=${PROJECT_ID} -location=${GMP_LOCATION} -cluster=${GMP_CLUSTER}" diff --git a/pkg/e2e/filter.go b/pkg/e2e/filter.go new file mode 100644 index 0000000000..f9a75144b5 --- /dev/null +++ b/pkg/e2e/filter.go @@ -0,0 +1,215 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "fmt" + "regexp" + "strings" + "time" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "k8s.io/utils/ptr" +) + +var ( + metricLabelRegex = regexp.MustCompile(`[a-z][a-z0-9_.\/]*`) + quotedStringRegex = regexp.MustCompile(`"(?:[^"\\]|\\.)*"`) + boolRegex = regexp.MustCompile(`(true)|(false)`) + numberRegex = regexp.MustCompile(`\d+`) + expressionObjectRegex = regexp.MustCompile(fmt.Sprintf(`(project)|(group\.id)|(metric\.type)|(metric\.labels\.%[1]s)|(resource\.type)|(resource\.labels\.%[1]s)|(metadata\.system_labels\.%[1]s)`, metricLabelRegex.String())) + expressionOperatorRegex = regexp.MustCompile(`=|>|<|(>=)|(<=)|(!=)|:`) + expressionValueRegex = regexp.MustCompile(fmt.Sprintf(`(%s)|(%s)|(%s)`, quotedStringRegex.String(), boolRegex.String(), numberRegex.String())) + expressionRegex = regexp.MustCompile(fmt.Sprintf(`^(?P%s)\s*(?P%s)\s*(?P%s)(?P\s+.*)?$`, expressionObjectRegex.String(), expressionOperatorRegex.String(), expressionValueRegex.String())) +) + +func runFilter(timeSeriesList []*monitoringpb.TimeSeries, filter pointFilter) []*monitoringpb.TimeSeries { + var timeSeriesFiltered []*monitoringpb.TimeSeries + for _, timeSeries := range timeSeriesList { + var pointsFiltered []*monitoringpb.Point + pointLabel: + for _, point := range timeSeries.Points { + if !filter.filter(timeSeries, point) { + continue pointLabel + } + pointsFiltered = append(pointsFiltered, point) + } + if len(pointsFiltered) == 0 { + continue + } + timeSeriesOut := &monitoringpb.TimeSeries{ + Metric: timeSeries.Metric, + Resource: timeSeries.Resource, + Metadata: timeSeries.Metadata, + MetricKind: timeSeries.MetricKind, + ValueType: timeSeries.ValueType, + Points: pointsFiltered, + Unit: timeSeries.Unit, + } + timeSeriesFiltered = append(timeSeriesFiltered, timeSeriesOut) + } + return timeSeriesFiltered +} + +// parseFilter parses the Google Cloud Monitoring API filter. +// +// See: https://cloud.google.com/monitoring/api/v3/filters#filter_syntax +// +// Currently limited, e.g. does not support NOTs, parenthesis or order of operations. +func parseFilter(filter string) (pointFilter, error) { + filter = strings.TrimSpace(filter) + filter = strings.ReplaceAll(filter, "\n", " ") + submatches := expressionRegex.FindStringSubmatch(filter) + if submatches == nil { + return nil, fmt.Errorf("invalid expression %q", filter) + } + object := submatches[expressionRegex.SubexpIndex("object")] + operator := submatches[expressionRegex.SubexpIndex("operator")] + value := submatches[expressionRegex.SubexpIndex("value")] + rest := submatches[expressionRegex.SubexpIndex("rest")] + + if operator != "=" { + return nil, fmt.Errorf("unsupported operator %q in expression %q", operator, filter) + } + eq := &equalExpression{ + object: object, + // Extract from quotes, if quoted. + value: strings.TrimSuffix(strings.TrimPrefix(value, "\""), "\""), + } + if rest == "" { + return eq, nil + } + + expressionLogicalRegex := regexp.MustCompile(`\s+(?PAND)\s+(?P.*)`) + submatches = expressionLogicalRegex.FindStringSubmatch(rest) + if submatches == nil { + return nil, fmt.Errorf("invalid sub-expression %q", strings.TrimSpace(rest)) + } + + logicalOperator := submatches[expressionLogicalRegex.SubexpIndex("operator")] + rest = submatches[expressionLogicalRegex.SubexpIndex("rest")] + + inner, err := parseFilter(rest) + if err != nil { + return nil, err + } + + switch logicalOperator { + case "AND": + return &andExpression{ + left: eq, + right: inner, + }, nil + case "OR": + return &orExpression{ + left: eq, + right: inner, + }, nil + default: + return nil, fmt.Errorf("invalid logical operator %q in expression %q", logicalOperator, filter) + } +} + +type pointFilter interface { + // filter returns true to keep the given point. + filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool +} + +type dateFilter struct { + startTime time.Time + endTime *time.Time +} + +func newIntervalFilter(interval *monitoringpb.TimeInterval) pointFilter { + filter := &dateFilter{ + startTime: interval.StartTime.AsTime(), + } + if interval.EndTime != nil { + filter.endTime = ptr.To(interval.EndTime.AsTime()) + } + return filter +} + +func (f *dateFilter) filter(_ *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + if f.endTime == nil { + return true + } + + pointStartTime := point.Interval.StartTime.AsTime() + pointEndTime := point.Interval.EndTime.AsTime() + if f.endTime.Before(pointStartTime) { + return false + } + + // Include equal end times as true. + return !pointEndTime.After(*f.endTime) +} + +type equalExpression struct { + object string + value string +} + +func (e *equalExpression) filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + return e.value == extractValue(timeSeries, point, e.object) +} + +func extractValue(timeSeries *monitoringpb.TimeSeries, _ *monitoringpb.Point, object string) string { + switch object { + case "project": + return timeSeries.GetResource().GetLabels()["project_id"] + case "metric.type": + return timeSeries.GetMetric().GetType() + case "resource.type": + return timeSeries.GetResource().GetType() + } + userLabelsPrefix := "metric.labels." + if strings.HasPrefix(object, userLabelsPrefix) { + labelName := object[len(userLabelsPrefix):] + if metric := timeSeries.GetMetric(); metric != nil { + if val, ok := metric.GetLabels()[labelName]; ok { + return val + } + } + return "" + } + resourceLabelsPrefix := "resource.labels." + if strings.HasPrefix(object, resourceLabelsPrefix) { + labelName := object[len(resourceLabelsPrefix):] + if resource := timeSeries.GetResource(); resource != nil { + if val, ok := resource.GetLabels()[labelName]; ok { + return val + } + } + return "" + } + return "" +} + +type andExpression struct { + left, right pointFilter +} + +func (e *andExpression) filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + return e.left.filter(timeSeries, point) && e.right.filter(timeSeries, point) +} + +type orExpression struct { + left, right pointFilter +} + +func (e *orExpression) filter(timeSeries *monitoringpb.TimeSeries, point *monitoringpb.Point) bool { + return e.left.filter(timeSeries, point) || e.right.filter(timeSeries, point) +} diff --git a/pkg/e2e/filter_test.go b/pkg/e2e/filter_test.go new file mode 100644 index 0000000000..93858f10e8 --- /dev/null +++ b/pkg/e2e/filter_test.go @@ -0,0 +1,191 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "testing" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/google/go-cmp/cmp" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestParseFilter(t *testing.T) { + point1 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 2}, + EndTime: ×tamppb.Timestamp{Seconds: 4}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 9.8}, + }, + } + + point2 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 6}, + EndTime: ×tamppb.Timestamp{Seconds: 8}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.5}, + }, + } + + timeSeries := &monitoringpb.TimeSeries{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{ + "foo": "bar", + }, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1, point2}, + } + + testCases := []struct { + name string + filter string + expected []*monitoringpb.Point + errExpected bool + }{ + { + name: "empty", + filter: "", + errExpected: true, + }, + { + name: "missing quotes", + filter: "project = example-project", + errExpected: true, + }, + { + name: "invalid object", + filter: "project_id = example-project", + errExpected: true, + }, + { + name: "invalid operation", + filter: "project_id == example-project", + errExpected: true, + }, + { + name: "right project", + filter: `project = "example-project"`, + expected: timeSeries.Points, + }, + { + name: "wrong project", + filter: `project = "example"`, + }, + { + name: "right metric type", + filter: `metric.type = "prometheus.googleapis.com/metric1/gauge"`, + expected: timeSeries.Points, + }, + { + name: "wrong metric type", + filter: `metric.type = "prometheus.googleapis.com/up/gauge"`, + }, + { + name: "right resource type", + filter: `resource.type = "prometheus_target"`, + expected: timeSeries.Points, + }, + { + name: "wrong resource type", + filter: `resource.type = "prometheus"`, + }, + { + name: "right metric label value", + filter: `metric.labels.foo = "bar"`, + expected: timeSeries.Points, + }, + { + name: "wrong metric label value", + filter: `metric.labels.foo = "baz"`, + }, + { + name: "missing metric label", + filter: `metric.labels.bar = "foo"`, + }, + { + name: "right resource label value", + filter: `resource.labels.project_id = "example-project"`, + expected: timeSeries.Points, + }, + { + name: "wrong resource label value", + filter: `resource.labels.project_id = "example"`, + }, + { + name: "missing resource label", + filter: `resource.labels.project = "example-project"`, + }, + { + name: "and expression both true", + filter: `project = "example-project" AND resource.type = "prometheus_target"`, + expected: timeSeries.Points, + }, + { + name: "and expression left true", + filter: `project = "example-project" AND resource.type = "prometheus"`, + }, + { + name: "and expression right true", + filter: `project = "example" AND resource.type = "prometheus_target"`, + }, + { + name: "and expression none true", + filter: `project = "example" AND resource.type = "prometheus"`, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + f, err := parseFilter(tc.filter) + if err != nil { + if tc.errExpected { + return + } + t.Fatal("parse filter failed", err) + } else if tc.errExpected { + t.Fatal("expected parse error", tc.filter) + } + var points []*monitoringpb.Point + for _, p := range timeSeries.Points { + if f.filter(timeSeries, p) { + points = append(points, p) + } + } + if diff := cmp.Diff(tc.expected, points, protocmp.Transform()); diff != "" { + t.Fatalf("expected points (-want, +got) %s", diff) + } + }) + } +} diff --git a/pkg/e2e/metric_database.go b/pkg/e2e/metric_database.go new file mode 100644 index 0000000000..20fb0a719c --- /dev/null +++ b/pkg/e2e/metric_database.go @@ -0,0 +1,302 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "errors" + "fmt" + "strings" + "sync" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + "golang.org/x/exp/maps" + metricpb "google.golang.org/genproto/googleapis/api/metric" + "google.golang.org/protobuf/proto" +) + +type MetricDatabase struct { + mtx sync.RWMutex + timeSeriesByProject map[string][]*monitoringpb.TimeSeries +} + +func NewMetricDatabase() *MetricDatabase { + return &MetricDatabase{ + timeSeriesByProject: make(map[string][]*monitoringpb.TimeSeries), + } +} + +func (db *MetricDatabase) All() []*monitoringpb.TimeSeries { + db.mtx.RLock() + defer db.mtx.RUnlock() + var timeSeries []*monitoringpb.TimeSeries + for _, timeSeriesFromProject := range db.timeSeriesByProject { + timeSeries = append(timeSeries, timeSeriesFromProject...) + } + return timeSeries +} + +func (db *MetricDatabase) Get(project string) []*monitoringpb.TimeSeries { + db.mtx.RLock() + defer db.mtx.RUnlock() + return db.timeSeriesByProject[project] +} + +func (db *MetricDatabase) Insert(project string, timeSeries []*monitoringpb.TimeSeries) error { + db.mtx.RLock() + defer db.mtx.RUnlock() + var errs []error + for _, timeSeriesAdd := range timeSeries { + if len(timeSeriesAdd.Points) == 0 { + errs = append(errs, fmt.Errorf("empty time series %q", timeSeriesAdd.GetMetric().GetType())) + } + + timeSeriesList, ok := db.timeSeriesByProject[project] + if !ok { + db.timeSeriesByProject[project] = []*monitoringpb.TimeSeries{timeSeriesAdd} + continue + } + + found := false + for _, timeSeries := range timeSeriesList { + if isTimeSeriesSame(timeSeries, timeSeriesAdd) { + found = true + for i, newPoint := range timeSeriesAdd.Points { + newPointInterval := newPoint.GetInterval() + if timeSeries.GetMetricKind() == metricpb.MetricDescriptor_GAUGE { + if newPointInterval.GetStartTime() == nil { + newPointInterval.StartTime = newPointInterval.EndTime + } else if !proto.Equal(newPointInterval.GetStartTime(), newPointInterval.GetEndTime()) { + errs = append(errs, fmt.Errorf("time series %s point %d gauge start time and end time must be same", timeSeriesAdd.GetMetric().GetType(), i)) + continue + } + } + if newPointInterval.GetStartTime() == nil { + errs = append(errs, fmt.Errorf("time series %s point %d missing start time", timeSeriesAdd.GetMetric().GetType(), i)) + continue + } + if newPointInterval.GetEndTime() == nil { + errs = append(errs, fmt.Errorf("time series %s point %d missing end time", timeSeriesAdd.GetMetric().GetType(), i)) + continue + } + + newPointStart := newPointInterval.GetStartTime().AsTime() + newPointEnd := newPointInterval.GetEndTime().AsTime() + if newPointStart.After(newPointEnd) { + errs = append(errs, fmt.Errorf("time series %s point %d start time after end time", timeSeriesAdd.GetMetric().GetType(), i)) + continue + } + + lastPoint := LatestPoint(timeSeries) + + if newPointStart.Before(lastPoint.GetInterval().GetStartTime().AsTime()) { + errs = append(errs, fmt.Errorf("time series %s point %d new start time before last start time", timeSeriesAdd.GetMetric().GetType(), i)) + continue + } + if newPointEnd.Before(lastPoint.GetInterval().GetEndTime().AsTime()) { + errs = append(errs, fmt.Errorf("time series %s point %d new end time before last end time", timeSeriesAdd.GetMetric().GetType(), i)) + continue + } + + timeSeries.Points = append([]*monitoringpb.Point{newPoint}, timeSeries.Points...) + } + break + } + } + + if !found { + db.timeSeriesByProject[project] = append(timeSeriesList, timeSeriesAdd) + } + } + + return errors.Join(errs...) +} + +func LatestPoint(timeSeries *monitoringpb.TimeSeries) *monitoringpb.Point { + points := timeSeries.GetPoints() + if len(points) == 0 { + return nil + } + return points[len(points)-1] +} + +// func (db *MetricDatabase) Querier(mint, maxt int64) (storage.Querier, error) { +// intervalFilter := newIntervalFilter(&monitoringpb.TimeInterval{ +// StartTime: ×tamppb.Timestamp{ +// Seconds: mint, +// }, +// EndTime: ×tamppb.Timestamp{ +// Seconds: maxt, +// }, +// }) +// return &storage.MockQuerier{ +// SelectMockFunction: func(sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet { +// filter := &andExpression{ +// left: intervalFilter, +// right: &matchersFilter{ +// matchers: matchers, +// }, +// } +// return ToSeriesSet(runFilter(db.All(), filter)) +// }, +// }, nil +// } + +type MetricCollector struct { + logger logr.Logger + metricTypePrefix string + metricDatabase *MetricDatabase +} + +func NewMetricCollector(logger logr.Logger, metricTypePrefix string, metricDatabase *MetricDatabase) prometheus.Collector { + return &MetricCollector{ + logger: logger, + metricTypePrefix: metricTypePrefix, + metricDatabase: metricDatabase, + } +} + +func (collector *MetricCollector) Describe(_ chan<- *prometheus.Desc) { + // Tell the Prometheus registry that the metrics are dynamically generated. +} + +func (collector *MetricCollector) Collect(ch chan<- prometheus.Metric) { + for _, timeSeries := range collector.metricDatabase.All() { + timeSeriesName := timeSeries.GetMetric().GetType() + point := LatestPoint(timeSeries) + switch timeSeries.GetValueType() { + case metricpb.MetricDescriptor_DOUBLE: + metricName, metricType := extractMetricMetadata(collector.metricTypePrefix, timeSeriesName) + labels := prometheus.Labels(Labels(timeSeries)) + switch metricType { + case "gauge": + gauge := prometheus.NewGauge(prometheus.GaugeOpts{ + Name: metricName, + ConstLabels: labels, + }) + gauge.Set(point.Value.GetDoubleValue()) + ch <- gauge + case "counter": + counter := prometheus.NewCounter(prometheus.CounterOpts{ + Name: metricName, + ConstLabels: labels, + }) + counter.Add(point.Value.GetDoubleValue()) + ch <- counter + default: + collector.logger.Info("unsupported metric type", "time_series", timeSeriesName, "metric_name", metricName, "metric_type", metricType) + } + case metricpb.MetricDescriptor_DISTRIBUTION: + collector.logger.Info("unsupported metric distribution", "time_series", timeSeriesName) + } + } +} + +func extractMetricMetadata(prefix string, name string) (metricType string, metricName string) { + if !strings.HasPrefix(name, prefix) { + return "", "" + } + meta := name[len(prefix)+1:] + split := strings.Split(meta, "/") + if len(split) != 2 { + return "", "" + } + return split[0], split[1] +} + +// func ToSeries(timeSeries *monitoringpb.TimeSeries) storage.Series { +// var timestamps []int64 +// var values []float64 +// for _, point := range timeSeries.GetPoints() { +// timestamps = append(timestamps, point.GetInterval().GetStartTime().GetSeconds()) +// timestamps = append(timestamps, point.GetInterval().GetEndTime().GetSeconds()) +// doubleValue := point.Value.GetDoubleValue() +// // Add value twice, once for the start time and once for the end time. +// values = append(values, doubleValue, doubleValue) +// } +// return storage.MockSeries(timestamps, values, maps.Values(Labels(timeSeries))) +// } + +func Labels(timeSeries *monitoringpb.TimeSeries) map[string]string { + labels := make(map[string]string) + if resource := timeSeries.GetResource(); resource != nil { + maps.Copy(labels, resource.GetLabels()) + } + if metric := timeSeries.GetMetric(); metric != nil { + maps.Copy(labels, metric.GetLabels()) + } + return labels +} + +// func ToSeriesSet(timeSeries []*monitoringpb.TimeSeries) storage.SeriesSet { +// var series []storage.Series +// for _, ts := range timeSeries { +// series = append(series, ToSeries(ts)) +// } +// return &mockSeriesSet{series: series} +// } + +// type mockSeriesSet struct { +// series []storage.Series +// index uint +// } + +// func (s *mockSeriesSet) Next() bool { +// if len(s.series) >= int(s.index) { +// return false +// } +// s.index++ +// return true +// } + +// func (s *mockSeriesSet) At() storage.Series { +// return s.series[s.index] +// } + +// func (s *mockSeriesSet) Err() error { +// return nil +// } + +// func (s *mockSeriesSet) Warnings() storage.Warnings { +// return nil +// } + +// type matchersFilter struct { +// matchers []*labels.Matcher +// } + +// func (f *matchersFilter) filter(timeSeries *monitoringpb.TimeSeries, _ *monitoringpb.Point) bool { +// for _, matcher := range f.matchers { +// if matcher.Matches(getLabelValue(timeSeries, matcher.Name)) { +// return true +// } +// } +// return false +// } + +// func getLabelValue(timeSeries *monitoringpb.TimeSeries, labelName string) string { +// if metric := timeSeries.GetMetric(); metric != nil { +// if val, ok := metric.GetLabels()[labelName]; ok { +// return val +// } +// } +// if resource := timeSeries.GetResource(); resource != nil { +// if val, ok := resource.GetLabels()[labelName]; ok { +// return val +// } +// } +// return "" +// } diff --git a/pkg/e2e/metric_database_test.go b/pkg/e2e/metric_database_test.go new file mode 100644 index 0000000000..c23ea83dbf --- /dev/null +++ b/pkg/e2e/metric_database_test.go @@ -0,0 +1,21 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +// import "testing" + +// func TestCollector(t *testing.T) { +// "prometheus.googleapis.com/always_one/gauge" +// } diff --git a/pkg/e2e/metric_service.go b/pkg/e2e/metric_service.go new file mode 100644 index 0000000000..f017462e27 --- /dev/null +++ b/pkg/e2e/metric_service.go @@ -0,0 +1,113 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "errors" + "fmt" + "maps" + "strings" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "google.golang.org/protobuf/types/known/emptypb" +) + +// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/projects.timeSeries/create +const defaultMaxTimeSeriesPerRequest = 200 + +func NewFakeMetricServer(db *MetricDatabase) monitoringpb.MetricServiceServer { + return newFakeMetricServer(db) +} + +func newFakeMetricServer(db *MetricDatabase) *fakeMetricServer { + return &fakeMetricServer{ + db: db, + maxTimeSeriesPerRequest: defaultMaxTimeSeriesPerRequest, + } +} + +type fakeMetricServer struct { + monitoringpb.UnimplementedMetricServiceServer + db *MetricDatabase + maxTimeSeriesPerRequest int +} + +func (fms *fakeMetricServer) CreateTimeSeries(_ context.Context, req *monitoringpb.CreateTimeSeriesRequest) (*emptypb.Empty, error) { + if req == nil { + return nil, errors.New("nil request") + } + if len(req.GetTimeSeries()) < 1 { + return nil, errors.New("there are no time series to add") + } + if amount := len(req.GetTimeSeries()); amount > fms.maxTimeSeriesPerRequest { + return nil, fmt.Errorf("exceeded the max number of time series, %d vs %d", amount, fms.maxTimeSeriesPerRequest) + } + if !strings.HasPrefix(req.GetName(), "projects/") { + return nil, fmt.Errorf("only projects are supported, found %q", req.GetName()) + } + + err := fms.db.Insert(req.GetName(), req.GetTimeSeries()) + return &emptypb.Empty{}, err +} + +func isTimeSeriesSame(left, right *monitoringpb.TimeSeries) bool { + if left == nil && right == nil { + return true + } + if left == nil || right == nil { + return false + } + + leftMetric := left.GetMetric() + rightMetric := right.GetMetric() + if leftMetric.GetType() != rightMetric.GetType() || !maps.Equal(leftMetric.GetLabels(), leftMetric.GetLabels()) { + return false + } + + leftResource := left.GetResource() + rightResource := right.GetResource() + return leftResource.GetType() == rightResource.GetType() && maps.Equal(leftResource.GetLabels(), rightResource.GetLabels()) +} + +func (fms *fakeMetricServer) ListTimeSeries(_ context.Context, req *monitoringpb.ListTimeSeriesRequest) (*monitoringpb.ListTimeSeriesResponse, error) { + if req == nil { + return nil, errors.New("nil request") + } + if req.Aggregation != nil || req.SecondaryAggregation != nil { + return nil, errors.New("fake metric server does not support aggregation") + } + if req.Interval == nil { + return nil, errors.New("time interval required") + } + if req.View == monitoringpb.ListTimeSeriesRequest_HEADERS { + return nil, errors.New("header view is not supported") + } + + expressionFilter, err := parseFilter(req.Filter) + if err != nil { + return nil, err + } + + filter := andExpression{ + left: newIntervalFilter(req.Interval), + right: expressionFilter, + } + + timeSeriesToReturn := runFilter(fms.db.Get(req.Name), &filter) + return &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: timeSeriesToReturn, + }, nil +} diff --git a/pkg/e2e/metric_service_test.go b/pkg/e2e/metric_service_test.go new file mode 100644 index 0000000000..5b2a509509 --- /dev/null +++ b/pkg/e2e/metric_service_test.go @@ -0,0 +1,782 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "reflect" + "testing" + + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "github.com/google/go-cmp/cmp" + metricpb "google.golang.org/genproto/googleapis/api/metric" + monitoredrespb "google.golang.org/genproto/googleapis/api/monitoredres" + "google.golang.org/protobuf/testing/protocmp" + "google.golang.org/protobuf/types/known/timestamppb" +) + +// Returns true if every field in TimeSeries a is deeply equal to TimeSeries b +// ignoring the Points field. False otherwise. +func timeSeriesEqualsExceptPoints(a *monitoringpb.TimeSeries, b *monitoringpb.TimeSeries) bool { + tmp := a.Points + a.Points = b.Points + isEqual := reflect.DeepEqual(a, b) + a.Points = tmp + return isEqual +} + +func TestCreateTimeSeriesBadInput(t *testing.T) { + ctx := context.Background() + db := NewMetricDatabase() + fms := fakeMetricServer{ + maxTimeSeriesPerRequest: 1, + db: db, + } + projectName := "projects/1234" + // add a time series to the FakeMetricServer so that + // TestAddPointInPast will fail as expected + timeSeries := []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }} + createTimeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{ + Name: projectName, + TimeSeries: timeSeries, + } + if _, err := fms.CreateTimeSeries(context.TODO(), createTimeSeriesRequest); err != nil { + t.Fatalf("create time series: %s", err) + } + + // these are the subtests + tests := []*struct { + desc string + requests []*monitoringpb.CreateTimeSeriesRequest + }{ + { + desc: "TestNil", + requests: []*monitoringpb.CreateTimeSeriesRequest{nil}, + }, + { + desc: "TestExceedMaxTimeSeriesPerRequest", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + // Note: the TimeSeries are empty here since the check for exceeding + // the max timeseries in a requet happens before we verify + // data the data in the TimeSeries. + TimeSeries: []*monitoringpb.TimeSeries{{}, {}}, + }, + }, + }, + { + desc: "TestNoTimeSeriesToAdd", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + }, + }, + }, + { + desc: "TestNoPointInTimeSeriesToAdd", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + }}, + }, + }, + }, + { + desc: "TestAddPointInPast", + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + for _, request := range test.requests { + _, err := fms.CreateTimeSeries(ctx, request) + if err == nil { + t.Errorf("expected an error for %q", test.desc) + } + } + }) + } +} + +func TestCreateTimeSeries(t *testing.T) { + ctx := context.Background() + db := NewMetricDatabase() + fms := newFakeMetricServer(db) + projectName := "projects/1234" + + // these are the subtests + tests := []*struct { + desc string + requests []*monitoringpb.CreateTimeSeriesRequest + // index we expect the newly added timeseries to be in the fake metric server + timeSeriesIndexToCheck []int + // index we expect the newly added point to be in the fake metric server + pointsIndexToCheck []int + }{ + // This test adds a new time series with a new project id to the fake metric server. + // It then adds a new time series to the same project. + // It then adds a new point to the second time series. + { + desc: "TestCreateTimeSeries-NewProject-NewTimeSeries-NewPoint", + timeSeriesIndexToCheck: []int{0, 1, 1}, + pointsIndexToCheck: []int{0, 0, 0}, + requests: []*monitoringpb.CreateTimeSeriesRequest{ + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job2", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + { + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{{ + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job2", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 3}, + EndTime: ×tamppb.Timestamp{Seconds: 4}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + for i, request := range test.requests { + response, err := fms.CreateTimeSeries(ctx, request) + if err != nil || response == nil { + t.Errorf("unexpected error: %s", err) + } + if !timeSeriesEqualsExceptPoints( + request.TimeSeries[0], + db.Get(projectName)[test.timeSeriesIndexToCheck[i]], + ) { + t.Errorf( + "expected %+v and got %+v. Note: the points were not compared", + request.TimeSeries[0], + db.Get(projectName)[test.timeSeriesIndexToCheck[i]], + ) + } + if !reflect.DeepEqual( + request.TimeSeries[0].Points[0], + db.Get(projectName)[test.timeSeriesIndexToCheck[i]].Points[test.pointsIndexToCheck[i]], + ) { + t.Errorf( + "expected %+v and got %+v", + request.TimeSeries[0].Points[0], + db.Get(projectName)[test.timeSeriesIndexToCheck[i]].Points[test.pointsIndexToCheck[i]], + ) + } + } + }) + } +} + +func TestCreateTimeSeriesTwoSeries(t *testing.T) { + db := NewMetricDatabase() + fms := newFakeMetricServer(db) + projectName := "projects/1234" + + request := &monitoringpb.CreateTimeSeriesRequest{ + Name: projectName, + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }, + { + Resource: &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe1", + "cluster": "foo-cluster", + "namespace": "", + "job": "job1", + "instance": "instance1", + }, + }, + Metric: &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + }, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + }}, + }, + }, + } + response, err := fms.CreateTimeSeries(context.TODO(), request) + if err != nil || response == nil { + t.Errorf("unexpected error: %s", err) + } + for i := range request.TimeSeries { + if !reflect.DeepEqual(request.TimeSeries[i], db.Get(projectName)[i]) { + t.Errorf("expected %+v and got %+v", request.TimeSeries[i], db.Get(projectName)[i]) + } + } +} + +func TestListTimeSeriesBadInput(t *testing.T) { + db := NewMetricDatabase() + fms := newFakeMetricServer(db) + projectName := "projects/1234" + filter := "metric.type = \"prometheus.googleapis.com/metric1/gauge\"" + + // these are the subtests + tests := []*struct { + desc string + request *monitoringpb.ListTimeSeriesRequest + }{ + { + desc: "TestListTimeSeriesNilRequest", + request: nil, + }, + {}, + { + desc: "TestListTimeSeriesAggregation", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Aggregation: &monitoringpb.Aggregation{}, + Filter: filter, + }, + }, + { + desc: "TestListTimeSeriesNoInterval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + }, + }, + { + desc: "TestListTimeSeriesHeadersView", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + View: monitoringpb.ListTimeSeriesRequest_HEADERS, + }, + }, + { + desc: "TestListTimeSeriesMalformedFilter", + request: &monitoringpb.ListTimeSeriesRequest{ + + Name: projectName, + Filter: "metric.type = \"prometheus-target\" AND metric.labels.location = ", + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + }, + }, + } + + for _, test := range tests { + t.Run(test.desc, func(t *testing.T) { + response, err := fms.ListTimeSeries(context.TODO(), test.request) + if err == nil || response != nil { + t.Errorf("expected an error for %q", test.desc) + } + }) + } +} + +func TestListTimeSeries(t *testing.T) { + db := NewMetricDatabase() + fms := newFakeMetricServer(db) + projectName := "projects/1234" + filter := "metric.type = \"prometheus.googleapis.com/metric1/gauge\" AND project = \"example-project\"" + + point1 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 1}, + EndTime: ×tamppb.Timestamp{Seconds: 2}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 0.6}, + }, + } + + point2 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 4}, + EndTime: ×tamppb.Timestamp{Seconds: 5}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 9.8}, + }, + } + + point3 := &monitoringpb.Point{ + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 11}, + EndTime: ×tamppb.Timestamp{Seconds: 18}, + }, + Value: &monitoringpb.TypedValue{ + Value: &monitoringpb.TypedValue_DoubleValue{DoubleValue: 1.4}, + }, + } + + resource1 := &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "default", + "job": "job1", + "instance": "instance1", + }, + } + + resource2 := &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "default", + "job": "job2", + "instance": "instance1", + }, + } + + resource3 := &monitoredrespb.MonitoredResource{ + Type: "prometheus_target", + Labels: map[string]string{ + "project_id": "example-project2", + "location": "europe", + "cluster": "foo-cluster", + "namespace": "default", + "job": "job1", + "instance": "instance1", + }, + } + + metric := &metricpb.Metric{ + Type: "prometheus.googleapis.com/metric1/gauge", + Labels: map[string]string{"k1": "v1"}, + } + + timeSeriesJob1 := &monitoringpb.TimeSeries{ + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + } + + timeSeriesJob2 := &monitoringpb.TimeSeries{ + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + } + + timeSeriesJob3 := &monitoringpb.TimeSeries{ + Resource: resource3, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point3}, + } + + timeSeries := []*monitoringpb.TimeSeries{timeSeriesJob1, timeSeriesJob2, timeSeriesJob3} + createTimeSeriesRequest := &monitoringpb.CreateTimeSeriesRequest{ + Name: projectName, + TimeSeries: timeSeries, + } + if _, err := fms.CreateTimeSeries(context.TODO(), createTimeSeriesRequest); err != nil { + t.Fatalf("create time series: %s", err) + } + + timeSeriesJob1Point2 := &monitoringpb.TimeSeries{ + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2}, + } + + createTimeSeriesRequest.TimeSeries = []*monitoringpb.TimeSeries{timeSeriesJob1Point2} + if _, err := fms.CreateTimeSeries(context.TODO(), createTimeSeriesRequest); err != nil { + t.Fatalf("create time series: %s", err) + } + + testCases := []*struct { + desc string + request *monitoringpb.ListTimeSeriesRequest + expected *monitoringpb.ListTimeSeriesResponse + }{ + { + desc: "filter in range short interval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 3}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + { + desc: "filter in range wide interval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 6}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2, point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + { + desc: "interval out of range", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 10}, + EndTime: ×tamppb.Timestamp{Seconds: 11}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{}, + }, + }, + { + desc: "filter within point interval", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: filter, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 12}, + EndTime: ×tamppb.Timestamp{Seconds: 14}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource3, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point3}, + }, + }, + }, + }, + { + desc: "filter project", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: "project = \"example-project\"", + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 6}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2, point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + { + desc: "complex filter", + request: &monitoringpb.ListTimeSeriesRequest{ + Name: projectName, + Filter: `resource.type = "prometheus_target" AND resource.labels.project_id = "example-project" AND resource.labels.location = "europe" AND resource.labels.cluster = "foo-cluster" AND resource.labels.namespace = "default" AND metric.type = "prometheus.googleapis.com/metric1/gauge"`, + Interval: &monitoringpb.TimeInterval{ + StartTime: ×tamppb.Timestamp{Seconds: 0}, + EndTime: ×tamppb.Timestamp{Seconds: 6}, + }, + }, + expected: &monitoringpb.ListTimeSeriesResponse{ + TimeSeries: []*monitoringpb.TimeSeries{ + { + Resource: resource1, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point2, point1}, + }, + { + Resource: resource2, + Metric: metric, + MetricKind: metricpb.MetricDescriptor_GAUGE, + ValueType: metricpb.MetricDescriptor_DOUBLE, + Points: []*monitoringpb.Point{point1}, + }, + }, + }, + }, + } + + ctx := context.Background() + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + response, err := fms.ListTimeSeries(ctx, tc.request) + if err != nil { + t.Errorf("unexpected error: %s", err) + } + if diff := cmp.Diff(tc.expected, response, protocmp.Transform()); diff != "" { + t.Errorf("expected response (-want, +got) %s", diff) + } + }) + } +} diff --git a/pkg/export/export.go b/pkg/export/export.go index a4d3fbd6b6..6f91a90ea2 100644 --- a/pkg/export/export.go +++ b/pkg/export/export.go @@ -29,7 +29,7 @@ import ( "time" monitoring "cloud.google.com/go/monitoring/apiv3/v2" - monitoring_pb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" + "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb" "github.com/go-kit/log" "github.com/go-kit/log/level" gax "github.com/googleapis/gax-go/v2" @@ -511,7 +511,7 @@ func (e *Exporter) Export(metadata MetadataFunc, batch []record.RefSample, exemp e.triggerNext() } -func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool { +func sampleInRange(sample *monitoringpb.TimeSeries, start, end time.Time) bool { // A sample has exactly one point in the time series. The start timestamp may be unset for gauges. if s := sample.Points[0].Interval.StartTime; s != nil && s.AsTime().Before(start) { return false @@ -522,7 +522,7 @@ func sampleInRange(sample *monitoring_pb.TimeSeries, start, end time.Time) bool return true } -func (e *Exporter) enqueue(hash uint64, sample *monitoring_pb.TimeSeries) { +func (e *Exporter) enqueue(hash uint64, sample *monitoringpb.TimeSeries) { idx := hash % uint64(len(e.shards)) e.shards[idx].enqueue(hash, sample) } @@ -863,7 +863,7 @@ type batch struct { logger log.Logger maxSize uint - m map[string][]*monitoring_pb.TimeSeries + m map[string][]*monitoringpb.TimeSeries shards []*shard oneFull bool total int @@ -876,7 +876,7 @@ func newBatch(logger log.Logger, shardsCount uint, maxSize uint) *batch { return &batch{ logger: logger, maxSize: maxSize, - m: make(map[string][]*monitoring_pb.TimeSeries, 1), + m: make(map[string][]*monitoringpb.TimeSeries, 1), shards: make([]*shard, 0, shardsCount/2), } } @@ -886,12 +886,12 @@ func (b *batch) addShard(s *shard) { } // add a new sample to the batch. Must only be called after full() returned false. -func (b *batch) add(s *monitoring_pb.TimeSeries) { +func (b *batch) add(s *monitoringpb.TimeSeries) { pid := s.Resource.Labels[KeyProjectID] l, ok := b.m[pid] if !ok { - l = make([]*monitoring_pb.TimeSeries, 0, b.maxSize) + l = make([]*monitoringpb.TimeSeries, 0, b.maxSize) } l = append(l, s) b.m[pid] = l @@ -923,7 +923,7 @@ func (b *batch) empty() bool { // requests have completed and notifies the pending shards. func (b batch) send( ctx context.Context, - sendOne func(context.Context, *monitoring_pb.CreateTimeSeriesRequest, ...gax.CallOption) error, + sendOne func(context.Context, *monitoringpb.CreateTimeSeriesRequest, ...gax.CallOption) error, ) { // Set timeout so slow requests in the batch do not block overall progress indefinitely. sendCtx, cancel := context.WithTimeout(ctx, 30*time.Second) @@ -935,7 +935,7 @@ func (b batch) send( for pid, l := range b.m { wg.Add(1) - go func(pid string, l []*monitoring_pb.TimeSeries) { + go func(pid string, l []*monitoringpb.TimeSeries) { defer wg.Done() pendingRequests.Inc() @@ -945,7 +945,7 @@ func (b batch) send( // We do not retry any requests due to the risk of producing a backlog // that cannot be worked down, especially if large amounts of clients try to do so. - err := sendOne(sendCtx, &monitoring_pb.CreateTimeSeriesRequest{ + err := sendOne(sendCtx, &monitoringpb.CreateTimeSeriesRequest{ Name: fmt.Sprintf("projects/%s", pid), TimeSeries: l, })