Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Clear EndpointStatuses for PodMonitoring without endpoints #1197

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 45 additions & 16 deletions pkg/operator/target_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,27 @@ func setupTargetStatusPoller(op *Operator, registry prometheus.Registerer, httpC
return nil
}

// shouldPoll verifies if polling collectors is configured or necessary.
// fetchAllPodMonitorings fetches all ClusterPodMonitoring and PodMonitoring CRs deployed in the cluster. This excludes ClusterNodeMonitoring CRs.
func fetchAllPodMonitorings(ctx context.Context, kubeClient client.Client) ([]monitoringv1.PodMonitoringCRD, error) {
var combinedList []monitoringv1.PodMonitoringCRD
var podMonitoringList monitoringv1.PodMonitoringList
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
return nil, err
}
for _, pm := range podMonitoringList.Items {
combinedList = append(combinedList, &pm)
}
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
return nil, err
}
for _, pm := range clusterPodMonitoringList.Items {
combinedList = append(combinedList, &pm)
}
return combinedList, nil
}

// shouldPoll verifies if polling collectors is configured.
func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kubeClient client.Client) (bool, error) {
// Check if target status is enabled.
var config monitoringv1.OperatorConfig
Expand All @@ -141,19 +161,6 @@ func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kub
if !config.Features.TargetStatus.Enabled {
return false, nil
}

// No need to poll if there's no PodMonitorings.
var podMonitoringList monitoringv1.PodMonitoringList
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
return false, err
} else if len(podMonitoringList.Items) == 0 {
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
return false, err
} else if len(clusterPodMonitoringList.Items) == 0 {
return false, nil
}
}
return true, nil
}

Expand Down Expand Up @@ -196,12 +203,20 @@ func (r *targetStatusReconciler) Reconcile(ctx context.Context, _ reconcile.Requ

// pollAndUpdate fetches and updates the target status in each collector pod.
func pollAndUpdate(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) error {
allPodMonitorings, err := fetchAllPodMonitorings(ctx, kubeClient)
if err != nil {
return err
}
if len(allPodMonitorings) == 0 {
// Nothing to update.
return nil
}
targets, err := fetchTargets(ctx, logger, opts, httpClient, getTarget, kubeClient)
if err != nil {
return err
}

return updateTargetStatus(ctx, logger, kubeClient, targets)
return updateTargetStatus(ctx, logger, kubeClient, targets, allPodMonitorings)
}

// fetchTargets retrieves the Prometheus targets using the given target function
Expand Down Expand Up @@ -307,13 +322,14 @@ func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, obj

// updateTargetStatus populates the status object of each pod using the given
// Prometheus targets.
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult) error {
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult, podMonitorings []monitoringv1.PodMonitoringCRD) error {
endpointMap, err := buildEndpointStatuses(targets)
if err != nil {
return err
}

var errs []error
withStatuses := map[string]bool{}
for job, endpointStatuses := range endpointMap {
pm, err := getObjectByScrapeJobKey(job)
if err != nil {
Expand All @@ -324,6 +340,7 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
// Skip hard-coded jobs which we do not patch.
continue
}
withStatuses[pm.GetName()] = true
pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses

if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
Expand All @@ -335,6 +352,18 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
}
}

// Any pod monitorings that exist but don't have endpoints should also be updated.
for _, pm := range podMonitorings {
if _, exists := withStatuses[pm.GetName()]; !exists {
pm.GetPodMonitoringStatus().EndpointStatuses = []monitoringv1.ScrapeEndpointStatus{}
if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
// Same reasoning as above for error handling.
errs = append(errs, err)
logger.Error(err, "patching empty status", "pm", pm.GetName(), "gvk", pm.GetObjectKind().GroupVersionKind())
}
}
}

return errors.Join(errs...)
}

Expand Down
168 changes: 127 additions & 41 deletions pkg/operator/target_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,26 @@ import (
)

type updateTargetStatusTestCase struct {
desc string
targets []*prometheusv1.TargetsResult
podMonitorings []monitoringv1.PodMonitoring
clusterPodMonitorings []monitoringv1.ClusterPodMonitoring
expErr func(err error) bool
desc string
targets []*prometheusv1.TargetsResult
podMonitorings []monitoringv1.PodMonitoring
initializeStatus []monitoringv1.PodMonitoringStatus
clusterPodMonitorings []monitoringv1.ClusterPodMonitoring
initializeClusterStatus []monitoringv1.PodMonitoringStatus
expErr func(err error) bool
}

func (tc *updateTargetStatusTestCase) getPodMonitoringCRDs() []monitoringv1.PodMonitoringCRD {
var combinedList []monitoringv1.PodMonitoringCRD

for _, pm := range tc.podMonitorings {
combinedList = append(combinedList, &pm)
}

for _, pm := range tc.clusterPodMonitorings {
combinedList = append(combinedList, &pm)
}
return combinedList
}

// Given a list of test cases on PodMonitoring, creates a new list containing
Expand Down Expand Up @@ -84,6 +99,7 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase
}
clusterTargets = append(clusterTargets, targetClusterPodMonitoring)
}

for _, pm := range tc.podMonitorings {
pmCopy := pm.DeepCopy()
cpm := monitoringv1.ClusterPodMonitoring{
Expand All @@ -103,26 +119,41 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase
}
clusterPodMonitorings = append(clusterPodMonitorings, cpm)
}

initializeClusterStatus := make([]monitoringv1.PodMonitoringStatus, 0, len(tc.initializeStatus))
for _, status := range tc.initializeStatus {
statusCopy := status.DeepCopy()

for idx, status := range statusCopy.EndpointStatuses {
statusCopy.EndpointStatuses[idx].Name = podMonitoringScrapePoolToClusterPodMonitoringScrapePool(status.Name)
}
initializeClusterStatus = append(initializeClusterStatus, *statusCopy)
}

dataPodMonitorings := updateTargetStatusTestCase{
desc: tc.desc + "-pod-monitoring",
targets: tc.targets,
podMonitorings: tc.podMonitorings,
expErr: tc.expErr,
desc: tc.desc + "-pod-monitoring",
targets: tc.targets,
podMonitorings: tc.podMonitorings,
initializeStatus: tc.initializeStatus,
expErr: tc.expErr,
}
dataFinal = append(dataFinal, dataPodMonitorings)
dataClusterPodMonitorings := updateTargetStatusTestCase{
desc: tc.desc + "-cluster-pod-monitoring",
targets: clusterTargets,
clusterPodMonitorings: clusterPodMonitorings,
expErr: tc.expErr,
desc: tc.desc + "-cluster-pod-monitoring",
targets: clusterTargets,
clusterPodMonitorings: clusterPodMonitorings,
initializeClusterStatus: initializeClusterStatus,
expErr: tc.expErr,
}
prometheusTargetsBoth := append(tc.targets, clusterTargets...)
dataBoth := updateTargetStatusTestCase{
desc: tc.desc + "-both",
targets: prometheusTargetsBoth,
podMonitorings: tc.podMonitorings,
clusterPodMonitorings: clusterPodMonitorings,
expErr: tc.expErr,
desc: tc.desc + "-both",
targets: prometheusTargetsBoth,
podMonitorings: tc.podMonitorings,
initializeStatus: tc.initializeStatus,
clusterPodMonitorings: clusterPodMonitorings,
initializeClusterStatus: initializeClusterStatus,
expErr: tc.expErr,
}
dataFinal = append(dataFinal, dataClusterPodMonitorings)
dataFinal = append(dataFinal, dataBoth)
Expand Down Expand Up @@ -1225,27 +1256,100 @@ func TestUpdateTargetStatus(t *testing.T) {
return err.Error() == "unknown scrape kind \"unknown\""
},
},
// No targets, with PodMonitoring config.
{
desc: "no-targets-no-match",
podMonitorings: []monitoringv1.PodMonitoring{
{
ObjectMeta: metav1.ObjectMeta{Name: "prom-example-1", Namespace: "gmp-test"},
Spec: monitoringv1.PodMonitoringSpec{},

Status: monitoringv1.PodMonitoringStatus{
MonitoringStatus: monitoringv1.MonitoringStatus{
ObservedGeneration: 2,
Conditions: []monitoringv1.MonitoringCondition{{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
Reason: "",
Message: "",
}},
},
},
},
},
initializeStatus: []monitoringv1.PodMonitoringStatus{
{
MonitoringStatus: monitoringv1.MonitoringStatus{
ObservedGeneration: 2,
Conditions: []monitoringv1.MonitoringCondition{{
Type: monitoringv1.ConfigurationCreateSuccess,
Status: corev1.ConditionTrue,
LastUpdateTime: metav1.Time{},
LastTransitionTime: metav1.Time{},
Reason: "",
Message: "",
}},
},
EndpointStatuses: []monitoringv1.ScrapeEndpointStatus{
{
Name: "PodMonitoring/gmp-test/prom-example-1/metrics",
ActiveTargets: 1,
UnhealthyTargets: 0,
LastUpdateTime: date,
SampleGroups: []monitoringv1.SampleGroup{
{
SampleTargets: []monitoringv1.SampleTarget{
{
Health: "up",
Labels: map[model.LabelName]model.LabelValue{
"instance": "a",
},
LastScrapeDurationSeconds: "1.2",
},
},
Count: ptr.To(int32(1)),
},
},
CollectorsFraction: "1",
},
},
},
},
},
})

for _, testCase := range testCases {
t.Run(fmt.Sprintf("target-status-conversion-%s", testCase.desc), func(t *testing.T) {
clientBuilder := newFakeClientBuilder()
for _, podMonitoring := range testCase.podMonitorings {
for i, podMonitoring := range testCase.podMonitorings {
pmCopy := podMonitoring.DeepCopy()
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
if len(testCase.initializeStatus) > 0 {
pmCopy.Status = testCase.initializeStatus[i]
} else {
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
}
clientBuilder.WithObjects(pmCopy)
}
for _, clusterPodMonitoring := range testCase.clusterPodMonitorings {
for i, clusterPodMonitoring := range testCase.clusterPodMonitorings {
pmCopy := clusterPodMonitoring.DeepCopy()
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
if len(testCase.initializeClusterStatus) > 0 {
pmCopy.Status = testCase.initializeClusterStatus[i]
} else {
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
}
clientBuilder.WithObjects(pmCopy)
}

kubeClient := clientBuilder.Build()

err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets)
// fetchTargets(ctx, logger, opts, nil, targetFetchFromMap(prometheusTargetMap), kubeClient)
err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets, testCase.getPodMonitoringCRDs())
if err != nil && (testCase.expErr == nil || !testCase.expErr(err)) {
t.Fatalf("unexpected error updating target status: %s", err)
} else if err == nil && (testCase.expErr != nil) {
t.Fatalf("expected error missing when updating target status")
}

for _, podMonitoring := range testCase.podMonitorings {
Expand Down Expand Up @@ -1602,24 +1706,6 @@ func TestShouldPoll(t *testing.T) {
should: false,
expErr: true,
},
{
desc: "should not poll targets - no podmonitorings",
objs: []client.Object{
&monitoringv1.OperatorConfig{
ObjectMeta: metav1.ObjectMeta{
Name: "config",
Namespace: "gmp-public",
},
Features: monitoringv1.OperatorFeatures{
TargetStatus: monitoringv1.TargetStatusSpec{
Enabled: true,
},
},
},
},
should: false,
expErr: false,
},
{
desc: "should not poll targets - disabled",
objs: []client.Object{
Expand Down
Loading