Skip to content

Commit

Permalink
COO-525: fix the creation of the operator's ServiceMonitor resource w…
Browse files Browse the repository at this point in the history
…hen not running in the default namespace (#620)

* test: check operator metrics in Prometheus

Signed-off-by: Simon Pasquier <[email protected]>

* fix: add permission on services/finalizers

Signed-off-by: Simon Pasquier <[email protected]>

---------

Signed-off-by: Simon Pasquier <[email protected]>
  • Loading branch information
simonpasquier authored Nov 18, 2024
1 parent 4732d43 commit 8a10d14
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ metadata:
categories: Monitoring
certified: "false"
containerImage: observability-operator:0.4.2
createdAt: "2024-11-05T06:54:25Z"
createdAt: "2024-11-15T07:47:01Z"
description: A Go based Kubernetes operator to setup and manage highly available
Monitoring Stack using Prometheus, Alertmanager and Thanos Querier.
operators.operatorframework.io/builder: operator-sdk-v1.37.0
Expand Down Expand Up @@ -307,6 +307,13 @@ spec:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services/finalizers
verbs:
- patch
- update
- apiGroups:
- apps
resources:
Expand Down
7 changes: 7 additions & 0 deletions deploy/operator/observability-operator-cluster-role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- services/finalizers
verbs:
- patch
- update
- apiGroups:
- apps
resources:
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/operator/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func newRoleBindingForPrometheusRole(namespace string) *rbacv1.RoleBinding {
APIGroup: corev1.SchemeGroupVersion.Group,
Kind: "ServiceAccount",
Name: "prometheus-k8s",
Namespace: namespace,
Namespace: reconciler.OpenshiftMonitoringNamespace,
}},
RoleRef: rbacv1.RoleRef{
APIGroup: rbacv1.SchemeGroupVersion.Group,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controllers/operator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,12 @@ type resourceManager struct {
}

// RBAC for managing Prometheus Operator CRs
// The controller also needs update permission to the services/finalizers
// subresource to set the owner reference with blockOwnerDeletion=true on the
// ServiceMonitor resource.
//+kubebuilder:rbac:groups=monitoring.coreos.com,resources=servicemonitors,verbs=list;watch;create;update;delete;patch
//+kubebuilder:rbac:groups=rbac.authorization.k8s.io,resources=roles;rolebindings,verbs=list;create;update;patch
//+kubebuilder:rbac:groups="",resources=services/finalizers,verbs=update;patch

// RegisterWithManager registers the controller with Manager
func RegisterWithManager(mgr ctrl.Manager, namespace string) error {
Expand Down
11 changes: 10 additions & 1 deletion pkg/controllers/uiplugin/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,16 @@ func newKorrel8rService(name string, namespace string) *corev1.Service {

func newKorrel8rConfigMap(name string, namespace string, info UIPluginInfo) (*corev1.ConfigMap, error) {

korrel8rData := map[string]string{"Metric": "thanos-querier", "MetricAlert": "alertmanager-main", "Log": "logging-loki-gateway-http", "Netflow": "loki-gateway-http", "Trace": "tempo-platform-gateway", "MonitoringNs": "openshift-monitoring", "LoggingNs": OpenshiftLoggingNs, "NetobservNs": OpenshiftNetobservNs, "TracingNs": OpenshiftTracingNs}
korrel8rData := map[string]string{
"Metric": "thanos-querier",
"MetricAlert": "alertmanager-main",
"Log": "logging-loki-gateway-http",
"Netflow": "loki-gateway-http", "Trace": "tempo-platform-gateway",
"MonitoringNs": reconciler.OpenshiftMonitoringNamespace,
"LoggingNs": OpenshiftLoggingNs,
"NetobservNs": OpenshiftNetobservNs,
"TracingNs": OpenshiftTracingNs,
}

if info.LokiServiceNames[OpenshiftLoggingNs] != "" {
korrel8rData["Log"] = info.LokiServiceNames[OpenshiftLoggingNs]
Expand Down
13 changes: 8 additions & 5 deletions pkg/controllers/uiplugin/troubleshooting_panel.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

uiv1alpha1 "github.com/rhobs/observability-operator/pkg/apis/uiplugin/v1alpha1"
"github.com/rhobs/observability-operator/pkg/reconciler"
)

const (
korrel8rSvcName = "korrel8r"
monitorClusterroleName = "cluster-monitoring"
alertmanagerRoleName = "monitoring-alertmanager-view"
)

func createTroubleshootingPanelPluginInfo(plugin *uiv1alpha1.UIPlugin, namespace, name, image string, features []string) (*UIPluginInfo, error) {
troubleshootingPanelConfig := plugin.Spec.TroubleshootingPanel
korrel8rSvcName := "korrel8r"
monitorClusterroleName := "cluster-monitoring"
alertmanagerRoleName := "monitoring-alertmanager-view"
monitoringNamespace := "openshift-monitoring"

configYaml, err := marshalTroubleshootingPanelPluginConfig(troubleshootingPanelConfig)
if err != nil {
Expand Down Expand Up @@ -92,7 +95,7 @@ func createTroubleshootingPanelPluginInfo(plugin *uiv1alpha1.UIPlugin, namespace
},
ObjectMeta: metav1.ObjectMeta{
Name: alertmanagerRoleName + "-rolebinding",
Namespace: monitoringNamespace,
Namespace: reconciler.OpenshiftMonitoringNamespace,
},
Subjects: []rbacv1.Subject{
{
Expand Down
6 changes: 6 additions & 0 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
)

const (
// OpenshiftMonitoringNamespace is the namespace in which the OpenShift
// monitoring components are deployed.
OpenshiftMonitoringNamespace = "openshift-monitoring"
)

// This interface is used by the resourceManagers to reconicle the resources they
// watch. If any component needs special treatment in the reconcile loop, create
// a new type that implements this interface.
Expand Down
195 changes: 178 additions & 17 deletions test/e2e/framework/assertions.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -267,18 +271,23 @@ func (f *Framework) GetOperatorPod(t *testing.T) *v1.Pod {
pods := v1.PodList{}
err = f.K8sClient.List(context.Background(), &pods, listOptions...)
if err != nil {
t.Error("failed to get opeator pods: ", err)
t.Errorf("failed to get operator pods: %s", err)
}

if len(pods.Items) != 1 {
t.Error("Expected 1 operator pod but got:", len(pods.Items))
t.Errorf("Expected 1 operator pod but got: %d", len(pods.Items))
}

return &pods.Items[0]
}

type HTTPOptions struct {
scheme string
scheme string
port string
method string
path string
body string
timeout time.Duration
}

func WithHTTPS() func(*HTTPOptions) {
Expand All @@ -287,13 +296,39 @@ func WithHTTPS() func(*HTTPOptions) {
}
}

func WithPort(p string) func(*HTTPOptions) {
return func(o *HTTPOptions) {
o.port = p
}
}

func WithMethod(m string) func(*HTTPOptions) {
return func(o *HTTPOptions) {
o.method = m
}
}

func WithPath(p string) func(*HTTPOptions) {
return func(o *HTTPOptions) {
o.path = p
}
}

func WithBody(b string) func(*HTTPOptions) {
return func(o *HTTPOptions) {
o.body = b
}
}

// GetPodMetrics requests the /metrics endpoint from the pod.
func (f *Framework) GetPodMetrics(pod *v1.Pod, opts ...func(*HTTPOptions)) ([]byte, error) {
var (
pollErr error
b []byte
)
opts = append(opts, WithPath("/metrics"), WithPort("8080"))
if err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, DefaultTestTimeout, true, func(ctx context.Context) (bool, error) {
b, pollErr = f.getPodMetrics(ctx, pod, opts...)
b, pollErr = f.getRequest(ctx, pod, opts...)
if pollErr != nil {
return false, nil
}
Expand All @@ -306,7 +341,124 @@ func (f *Framework) GetPodMetrics(pod *v1.Pod, opts ...func(*HTTPOptions)) ([]by
return b, nil
}

func (f *Framework) getPodMetrics(ctx context.Context, pod *v1.Pod, opts ...func(*HTTPOptions)) ([]byte, error) {
// AssertPromQLResult evaluates the PromQL expression against the in-cluster
// Prometheus stack.
// It returns an error if the request fails. Otherwise the result is passed to
// the callback function for additional checks.
func (f *Framework) AssertPromQLResult(t *testing.T, expr string, callback func(model.Value) error) error {
t.Helper()
var (
pollErr error
v model.Value
)
if err := wait.PollUntilContextTimeout(context.Background(), 20*time.Second, 3*DefaultTestTimeout, true, func(context.Context) (bool, error) {
v, pollErr = f.getPromQLResult(context.Background(), expr)
if pollErr != nil {
t.Logf("error from getPromQLResult(): %s", pollErr)
return false, nil
}

pollErr = callback(v)
if pollErr != nil {
return false, nil
}

return true, nil
}); err != nil {
return fmt.Errorf("failed to assert query %q: %w: %w", expr, err, pollErr)
}

return nil
}

// Copied from github.com/prometheus/client_golang/blob/api/prometheus/v1/api.go
type apiResponse struct {
Status string `json:"status"`
Result queryResult `json:"data"`
ErrorType string `json:"errorType"`
Error string `json:"error"`
Warnings []string `json:"warnings,omitempty"`
}

type queryResult struct {
Type model.ValueType `json:"resultType"`
Result interface{} `json:"result"`

// The decoded value.
v model.Value
}

func (qr *queryResult) UnmarshalJSON(b []byte) error {
v := struct {
Type model.ValueType `json:"resultType"`
Result json.RawMessage `json:"result"`
}{}

err := json.Unmarshal(b, &v)
if err != nil {
return err
}

switch v.Type {
case model.ValScalar:
var sv model.Scalar
err = json.Unmarshal(v.Result, &sv)
qr.v = &sv

case model.ValVector:
var vv model.Vector
err = json.Unmarshal(v.Result, &vv)
qr.v = vv

case model.ValMatrix:
var mv model.Matrix
err = json.Unmarshal(v.Result, &mv)
qr.v = mv

default:
err = fmt.Errorf("unexpected value type %q", v.Type)
}
return err
}

func (f *Framework) getPromQLResult(ctx context.Context, expr string) (model.Value, error) {
pods, err := f.getPodsForService("prometheus-k8s", "openshift-monitoring")
if err != nil {
return nil, fmt.Errorf("failed to get prometheus pod: %w", err)
}

if len(pods) == 0 {
return nil, fmt.Errorf("no Prometheus pods found")
}

data := url.Values{}
data.Set("query", expr)
b, err := f.getRequest(
ctx,
&pods[0],
WithPort("9090"),
WithMethod("POST"),
WithPath("/api/v1/query"),
WithBody(data.Encode()),
)
if err != nil {
return nil, fmt.Errorf("failed to query prometheus: %w", err)
}

var r apiResponse
if err := json.Unmarshal(b, &r); err != nil {
return nil, fmt.Errorf("failed to parse prometheus response: %w", err)
}

if r.Status != "success" {
return nil, fmt.Errorf("%q: %s (%s)", expr, r.ErrorType, r.Error)
}

return r.Result.v, nil
}

// getRequest makes an HTTP request to the pod via port-forward.
func (f *Framework) getRequest(ctx context.Context, pod *v1.Pod, opts ...func(*HTTPOptions)) ([]byte, error) {
var (
stopChan = make(chan struct{})
errChan = make(chan error, 1)
Expand All @@ -321,32 +473,40 @@ func (f *Framework) getPodMetrics(ctx context.Context, pod *v1.Pod, opts ...func
close(stopChan)
}()

err := f.StartPortForward(pod.Name, pod.Namespace, "8080", stopChan, errChan)
if err != nil {
return nil, fmt.Errorf("failed to start port-forwarding: %w", err)
}

httpOptions := HTTPOptions{
scheme: "http",
scheme: "http",
method: "GET",
timeout: 4 * time.Second,
}
for _, o := range opts {
o(&httpOptions)
}

// The /metrics endpoint shouldn't need more than 5 seconds to send a response.
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := f.StartPortForward(pod.Name, pod.Namespace, httpOptions.port, stopChan, errChan)
if err != nil {
return nil, fmt.Errorf("failed to start port-forwarding: %w", err)
}

ctx, cancel := context.WithTimeout(ctx, httpOptions.timeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s://localhost:8080/metrics", httpOptions.scheme), nil)
req, err := http.NewRequestWithContext(
ctx,
httpOptions.method,
httpOptions.scheme+"://"+path.Join(fmt.Sprintf("localhost:%s", httpOptions.port), httpOptions.path),
strings.NewReader(httpOptions.body),
)
if err != nil {
return nil, err
}
if req.Method == "POST" {
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
}

tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig = &tls.Config{
ServerName: fmt.Sprintf("observability-operator.%s.svc", pod.Namespace),
RootCAs: f.RootCA,
GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
fmt.Printf("client cert: %#v\n", f.MetricsClientCert)
return f.MetricsClientCert, nil
},
}
Expand All @@ -357,8 +517,9 @@ func (f *Framework) getPodMetrics(ctx context.Context, pod *v1.Pod, opts ...func
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
return nil, fmt.Errorf("invalid status code from %q: got %d", req.URL.String(), resp.StatusCode)
if resp.StatusCode/100 != 2 {
b, _ := io.ReadAll(resp.Body)
return nil, fmt.Errorf("invalid status code from %q: got %d (%q)", req.URL.String(), resp.StatusCode, string(b))
}

return io.ReadAll(resp.Body)
Expand Down
Loading

0 comments on commit 8a10d14

Please sign in to comment.