Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DRAFT] feat(e2e): use a fake metric service for e2e tests #988

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ endif
ifeq ($(KIND_PERSIST), 1)
E2E_DOCKER_ARGS += --env KIND_PERSIST=1
endif
E2E_DEPS:=config-reloader operator rule-evaluator go-synthetic
E2E_DEPS:=config-reloader operator rule-evaluator go-synthetic fake-metric-service
REGISTRY_NAME=kind-registry
REGISTRY_PORT=5001
KIND_PARALLEL?=5
Expand Down
29 changes: 29 additions & 0 deletions cmd/fake-metric-service/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

FROM google-go.pkg.dev/golang:1.22.3@sha256:efc6b824652199ede8bc25e2d5a57076e0e1ffbe90735dcbda3ee956fe33b612 as buildbase
WORKDIR /app
COPY go.mod go.mod
COPY go.sum go.sum
COPY cmd cmd
COPY pkg pkg
COPY vendor vendor

FROM buildbase as appbase
RUN CGO_ENABLED=1 GOEXPERIMENT=boringcrypto \
go build -tags boring -mod=vendor -o fake-metric-service cmd/fake-metric-service/*.go

FROM gke.gcr.io/gke-distroless/libc:gke_distroless_20240307.00_p0@sha256:4f834e207f2721977094aeec4c9daee7032c5daec2083c0be97760f4306e4f88
COPY --from=appbase /app/fake-metric-service /bin/fake-metric-service
ENTRYPOINT ["/bin/fake-metric-service"]
144 changes: 144 additions & 0 deletions cmd/fake-metric-service/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"flag"
"net"
"net/http"
"os"
"sync"
"time"

"cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/e2e"
"github.com/GoogleCloudPlatform/prometheus-engine/pkg/export"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
)

func main() {
logVerbosity := 0
addr := ":8080"
metricServiceAddr := ":8081"

flag.IntVar(&logVerbosity, "v", logVerbosity, "Logging verbosity")
flag.StringVar(&addr, "addr", addr, "Address to serve probe statuses (e.g. /readyz and /livez) and /metrics")
flag.StringVar(&metricServiceAddr, "metric-service-addr", metricServiceAddr, "Address to serve a mock metric service server.")
flag.Parse()

logger := zap.New(zap.Level(zapcore.Level(-logVerbosity)))
ctrl.SetLogger(logger)

ctx := signals.SetupSignalHandler()
if err := run(ctx, logger, addr, metricServiceAddr); err != nil {
logger.Error(err, "exit with error")
os.Exit(1)
}
}

func run(ctx context.Context, logger logr.Logger, probeAddr, metricServiceAddr string) error {
listener, err := net.Listen("tcp", metricServiceAddr)
if err != nil {
return err
}

wg := sync.WaitGroup{}
errs := make(chan error, 1)

logger.Info("starting server...")
metricDatabase := e2e.NewMetricDatabase()

{
server := grpc.NewServer(grpc.UnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
now := time.Now()
logger.Info("grpc request", "method", info.FullMethod, "time", now, "data", prototext.Format(req.(proto.Message)))
resp, err := handler(ctx, req)
logger.Info("grpc response", "method", info.FullMethod, "time", now, "duration", time.Since(now))
if err != nil {
logger.Error(err, "grpc failure", "method", info.FullMethod, "time", now)
}
return resp, err
}))
monitoringpb.RegisterMetricServiceServer(server, e2e.NewFakeMetricServer(metricDatabase))

wg.Add(1)
go func() {
for range ctx.Done() {
server.GracefulStop()
return
}
}()
go func() {
defer wg.Done()
if err := server.Serve(listener); err != nil {
errs <- err
}
}()
}

{
registry := prometheus.NewRegistry()
registry.MustRegister(e2e.NewMetricCollector(logger, export.MetricTypePrefix, metricDatabase))

wg.Add(1)
mux := http.NewServeMux()
mux.Handle("/readyz", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
mux.Handle("/livez", http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusOK)
}))
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{Registry: registry}))

server := http.Server{
Addr: probeAddr,
Handler: mux,
}
go func() {
for range ctx.Done() {
// Start new context because ours is done.
if err := server.Shutdown(context.Background()); err != nil {
errs <- err
}
}
}()
go func() {
defer wg.Done()
if err := server.ListenAndServe(); err != nil {
errs <- err
}
}()
}

go func() {
wg.Wait()
close(errs)
}()

for err := range errs {
return err
}
return nil
}
25 changes: 9 additions & 16 deletions e2e/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

gcm "cloud.google.com/go/monitoring/apiv3/v2"
gcmpb "cloud.google.com/go/monitoring/apiv3/v2/monitoringpb"
"github.com/GoogleCloudPlatform/prometheus-engine/e2e/kube"
monitoringv1 "github.com/GoogleCloudPlatform/prometheus-engine/pkg/operator/apis/monitoring/v1"
Expand Down Expand Up @@ -79,9 +78,7 @@ func TestCollectorPodMonitoring(t *testing.T) {
},
}
t.Run("self-podmonitoring-ready", testEnsurePodMonitoringReady(ctx, kubeClient, pm))
if !skipGCM {
t.Run("self-podmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-podmon"))
}
t.Run("self-podmonitoring-gcm", testValidateCollectorUpMetrics(ctx, restConfig, kubeClient, "collector-podmon"))
}

func TestCollectorClusterPodMonitoring(t *testing.T) {
Expand Down Expand Up @@ -120,9 +117,7 @@ func TestCollectorClusterPodMonitoring(t *testing.T) {
},
}
t.Run("self-clusterpodmonitoring-ready", testEnsureClusterPodMonitoringReady(ctx, kubeClient, cpm))
if !skipGCM {
t.Run("self-clusterpodmonitoring-gcm", testValidateCollectorUpMetrics(ctx, kubeClient, "collector-cmon"))
}
t.Run("self-clusterpodmonitoring-gcm", testValidateCollectorUpMetrics(ctx, restConfig, kubeClient, "collector-cmon"))
}

func TestCollectorKubeletScraping(t *testing.T) {
Expand All @@ -138,9 +133,7 @@ func TestCollectorKubeletScraping(t *testing.T) {
t.Run("collector-operatorconfig", testCollectorOperatorConfig(ctx, kubeClient))

t.Run("enable-kubelet-scraping", testEnableKubeletScraping(ctx, kubeClient))
if !skipGCM {
t.Run("scrape-kubelet", testCollectorScrapeKubelet(ctx, kubeClient))
}
t.Run("scrape-kubelet", testCollectorScrapeKubelet(ctx, restConfig, kubeClient))
}

// testCollectorDeployed does a high-level verification on whether the
Expand Down Expand Up @@ -440,12 +433,12 @@ func testEnableKubeletScraping(ctx context.Context, kubeClient client.Client) fu

// testValidateCollectorUpMetrics checks whether the scrape-time up metrics for all collector
// pods can be queried from GCM.
func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Client, job string) func(*testing.T) {
func testValidateCollectorUpMetrics(ctx context.Context, restConfig *rest.Config, kubeClient client.Client, job string) func(*testing.T) {
return func(t *testing.T) {
t.Log("checking for metrics in Cloud Monitoring")

// Wait for metric data to show up in Cloud Monitoring.
metricClient, err := gcm.NewMetricClient(ctx)
metricClient, err := newMetricClient(ctx, t, restConfig, kubeClient)
if err != nil {
t.Fatalf("create metric client: %s", err)
}
Expand Down Expand Up @@ -480,7 +473,7 @@ func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Clien
Filter: fmt.Sprintf(`
resource.type = "prometheus_target" AND
resource.labels.project_id = "%s" AND
resource.label.location = "%s" AND
resource.labels.location = "%s" AND
resource.labels.cluster = "%s" AND
resource.labels.namespace = "%s" AND
resource.labels.job = "%s" AND
Expand Down Expand Up @@ -519,12 +512,12 @@ func testValidateCollectorUpMetrics(ctx context.Context, kubeClient client.Clien
}

// testCollectorScrapeKubelet verifies that kubelet metric endpoints are successfully scraped.
func testCollectorScrapeKubelet(ctx context.Context, kubeClient client.Client) func(*testing.T) {
func testCollectorScrapeKubelet(ctx context.Context, restConfig *rest.Config, kubeClient client.Client) func(*testing.T) {
return func(t *testing.T) {
t.Log("checking for metrics in Cloud Monitoring")

// Wait for metric data to show up in Cloud Monitoring.
metricClient, err := gcm.NewMetricClient(ctx)
metricClient, err := newMetricClient(ctx, t, restConfig, kubeClient)
if err != nil {
t.Fatalf("create GCM metric client: %s", err)
}
Expand All @@ -548,7 +541,7 @@ func testCollectorScrapeKubelet(ctx context.Context, kubeClient client.Client) f
Filter: fmt.Sprintf(`
resource.type = "prometheus_target" AND
resource.labels.project_id = "%s" AND
resource.label.location = "%s" AND
resource.labels.location = "%s" AND
resource.labels.cluster = "%s" AND
resource.labels.job = "kubelet" AND
resource.labels.instance = "%s:%s" AND
Expand Down
54 changes: 39 additions & 15 deletions e2e/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ func CreateResources(ctx context.Context, kubeClient client.Client, deployOpts .
type DeployOption func(*deployOptions)

type deployOptions struct {
operatorNamespace string
publicNamespace string
projectID string
cluster string
location string
disableGCM bool
operatorNamespace string
publicNamespace string
projectID string
cluster string
location string
disableGCM bool
gcmEndpoint string
prometheusEndpoint string
}

func (opts *deployOptions) setDefaults() {
Expand Down Expand Up @@ -92,6 +94,18 @@ func WithDisableGCM(disableGCM bool) DeployOption {
}
}

func WithGCMEndpoint(gcmEndpoint string) DeployOption {
return func(opts *deployOptions) {
opts.gcmEndpoint = gcmEndpoint
}
}

func WithPrometheusEndpoint(prometheusEndpoint string) DeployOption {
return func(opts *deployOptions) {
opts.prometheusEndpoint = prometheusEndpoint
}
}

func createResources(ctx context.Context, kubeClient client.Client, normalizeFn func(client.Object) (client.Object, error)) error {
resources, err := resources(kubeClient.Scheme())
if err != nil {
Expand Down Expand Up @@ -130,16 +144,18 @@ func resources(scheme *runtime.Scheme) ([]client.Object, error) {
}

func normalizeDeamonSets(opts *deployOptions, obj *appsv1.DaemonSet) (client.Object, error) {
if !opts.disableGCM {
return obj, nil
}
if obj.GetName() != operator.NameCollector {
return obj, nil
}
for i := range obj.Spec.Template.Spec.Containers {
container := &obj.Spec.Template.Spec.Containers[i]
if container.Name == operator.CollectorPrometheusContainerName {
container.Args = append(container.Args, "--export.debug.disable-auth")
if opts.disableGCM || opts.gcmEndpoint != "" {
container.Args = append(container.Args, "--export.debug.disable-auth")
}
if opts.gcmEndpoint != "" {
container.Args = append(container.Args, fmt.Sprintf("--export.endpoint=%s", opts.gcmEndpoint))
}
return obj, nil
}
}
Expand Down Expand Up @@ -169,15 +185,23 @@ func normalizeDeployments(opts *deployOptions, obj *appsv1.Deployment) (client.O
container.Args = append(container.Args, fmt.Sprintf("--public-namespace=%s", opts.publicNamespace))
}
case operator.NameRuleEvaluator:
if !opts.disableGCM {
break
}
container, err := kube.DeploymentContainer(obj, operator.RuleEvaluatorContainerName)
if err != nil {
return nil, fmt.Errorf("unable to find rule-evaluator %q container: %w", operator.RuleEvaluatorContainerName, err)
}
container.Args = append(container.Args, "--export.debug.disable-auth")
container.Args = append(container.Args, "--query.debug.disable-auth")
if opts.disableGCM || opts.gcmEndpoint != "" {
container.Args = append(container.Args, "--export.debug.disable-auth")
}
if opts.gcmEndpoint != "" {
container.Args = append(container.Args, fmt.Sprintf("--export.endpoint=%s", opts.gcmEndpoint))
}

if opts.disableGCM || opts.prometheusEndpoint != "" {
container.Args = append(container.Args, "--query.debug.disable-auth")
}
if opts.prometheusEndpoint != "" {
container.Args = append(container.Args, fmt.Sprintf("--query.target-url=%s", opts.prometheusEndpoint))
}
}
return obj, nil
}
Expand Down
Loading
Loading