diff --git a/pkg/operator/target_status.go b/pkg/operator/target_status.go index 617fdd9fe0..c2ca570094 100644 --- a/pkg/operator/target_status.go +++ b/pkg/operator/target_status.go @@ -128,7 +128,27 @@ func setupTargetStatusPoller(op *Operator, registry prometheus.Registerer, httpC return nil } -// shouldPoll verifies if polling collectors is configured or necessary. +// fetchAllPodMonitorings fetches all ClusterPodMonitoring and PodMonitoring CRs deployed in the cluster. This excludes ClusterNodeMonitoring CRs. +func fetchAllPodMonitorings(ctx context.Context, kubeClient client.Client) ([]monitoringv1.PodMonitoringCRD, error) { + var combinedList []monitoringv1.PodMonitoringCRD + var podMonitoringList monitoringv1.PodMonitoringList + if err := kubeClient.List(ctx, &podMonitoringList); err != nil { + return nil, err + } + for _, pm := range podMonitoringList.Items { + combinedList = append(combinedList, &pm) + } + var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList + if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil { + return nil, err + } + for _, pm := range clusterPodMonitoringList.Items { + combinedList = append(combinedList, &pm) + } + return combinedList, nil +} + +// shouldPoll verifies if polling collectors is configured. func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kubeClient client.Client) (bool, error) { // Check if target status is enabled. var config monitoringv1.OperatorConfig @@ -141,19 +161,6 @@ func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kub if !config.Features.TargetStatus.Enabled { return false, nil } - - // No need to poll if there's no PodMonitorings. - var podMonitoringList monitoringv1.PodMonitoringList - if err := kubeClient.List(ctx, &podMonitoringList); err != nil { - return false, err - } else if len(podMonitoringList.Items) == 0 { - var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList - if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil { - return false, err - } else if len(clusterPodMonitoringList.Items) == 0 { - return false, nil - } - } return true, nil } @@ -196,12 +203,20 @@ func (r *targetStatusReconciler) Reconcile(ctx context.Context, _ reconcile.Requ // pollAndUpdate fetches and updates the target status in each collector pod. func pollAndUpdate(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) error { + allPodMonitorings, err := fetchAllPodMonitorings(ctx, kubeClient) + if err != nil { + return err + } + if len(allPodMonitorings) == 0 { + // Nothing to update. + return nil + } targets, err := fetchTargets(ctx, logger, opts, httpClient, getTarget, kubeClient) if err != nil { return err } - return updateTargetStatus(ctx, logger, kubeClient, targets) + return updateTargetStatus(ctx, logger, kubeClient, targets, allPodMonitorings) } // fetchTargets retrieves the Prometheus targets using the given target function @@ -307,13 +322,14 @@ func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, obj // updateTargetStatus populates the status object of each pod using the given // Prometheus targets. -func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult) error { +func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult, podMonitorings []monitoringv1.PodMonitoringCRD) error { endpointMap, err := buildEndpointStatuses(targets) if err != nil { return err } var errs []error + withStatuses := map[string]bool{} for job, endpointStatuses := range endpointMap { pm, err := getObjectByScrapeJobKey(job) if err != nil { @@ -324,6 +340,7 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie // Skip hard-coded jobs which we do not patch. continue } + withStatuses[pm.GetName()] = true pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil { @@ -335,6 +352,18 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie } } + // Any pod monitorings that exist but don't have endpoints should also be updated. + for _, pm := range podMonitorings { + if _, exists := withStatuses[pm.GetName()]; !exists { + pm.GetPodMonitoringStatus().EndpointStatuses = []monitoringv1.ScrapeEndpointStatus{} + if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil { + // Same reasoning as above for error handling. + errs = append(errs, err) + logger.Error(err, "patching empty status", "pm", pm.GetName(), "gvk", pm.GetObjectKind().GroupVersionKind()) + } + } + } + return errors.Join(errs...) } diff --git a/pkg/operator/target_status_test.go b/pkg/operator/target_status_test.go index 4cb8984529..dfd64f1a30 100644 --- a/pkg/operator/target_status_test.go +++ b/pkg/operator/target_status_test.go @@ -45,11 +45,26 @@ import ( ) type updateTargetStatusTestCase struct { - desc string - targets []*prometheusv1.TargetsResult - podMonitorings []monitoringv1.PodMonitoring - clusterPodMonitorings []monitoringv1.ClusterPodMonitoring - expErr func(err error) bool + desc string + targets []*prometheusv1.TargetsResult + podMonitorings []monitoringv1.PodMonitoring + initializeStatus []monitoringv1.PodMonitoringStatus + clusterPodMonitorings []monitoringv1.ClusterPodMonitoring + initializeClusterStatus []monitoringv1.PodMonitoringStatus + expErr func(err error) bool +} + +func (tc *updateTargetStatusTestCase) getPodMonitoringCRDs() []monitoringv1.PodMonitoringCRD { + var combinedList []monitoringv1.PodMonitoringCRD + + for _, pm := range tc.podMonitorings { + combinedList = append(combinedList, &pm) + } + + for _, pm := range tc.clusterPodMonitorings { + combinedList = append(combinedList, &pm) + } + return combinedList } // Given a list of test cases on PodMonitoring, creates a new list containing @@ -84,6 +99,7 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase } clusterTargets = append(clusterTargets, targetClusterPodMonitoring) } + for _, pm := range tc.podMonitorings { pmCopy := pm.DeepCopy() cpm := monitoringv1.ClusterPodMonitoring{ @@ -103,26 +119,41 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase } clusterPodMonitorings = append(clusterPodMonitorings, cpm) } + + initializeClusterStatus := make([]monitoringv1.PodMonitoringStatus, 0, len(tc.initializeStatus)) + for _, status := range tc.initializeStatus { + statusCopy := status.DeepCopy() + + for idx, status := range statusCopy.EndpointStatuses { + statusCopy.EndpointStatuses[idx].Name = podMonitoringScrapePoolToClusterPodMonitoringScrapePool(status.Name) + } + initializeClusterStatus = append(initializeClusterStatus, *statusCopy) + } + dataPodMonitorings := updateTargetStatusTestCase{ - desc: tc.desc + "-pod-monitoring", - targets: tc.targets, - podMonitorings: tc.podMonitorings, - expErr: tc.expErr, + desc: tc.desc + "-pod-monitoring", + targets: tc.targets, + podMonitorings: tc.podMonitorings, + initializeStatus: tc.initializeStatus, + expErr: tc.expErr, } dataFinal = append(dataFinal, dataPodMonitorings) dataClusterPodMonitorings := updateTargetStatusTestCase{ - desc: tc.desc + "-cluster-pod-monitoring", - targets: clusterTargets, - clusterPodMonitorings: clusterPodMonitorings, - expErr: tc.expErr, + desc: tc.desc + "-cluster-pod-monitoring", + targets: clusterTargets, + clusterPodMonitorings: clusterPodMonitorings, + initializeClusterStatus: initializeClusterStatus, + expErr: tc.expErr, } prometheusTargetsBoth := append(tc.targets, clusterTargets...) dataBoth := updateTargetStatusTestCase{ - desc: tc.desc + "-both", - targets: prometheusTargetsBoth, - podMonitorings: tc.podMonitorings, - clusterPodMonitorings: clusterPodMonitorings, - expErr: tc.expErr, + desc: tc.desc + "-both", + targets: prometheusTargetsBoth, + podMonitorings: tc.podMonitorings, + initializeStatus: tc.initializeStatus, + clusterPodMonitorings: clusterPodMonitorings, + initializeClusterStatus: initializeClusterStatus, + expErr: tc.expErr, } dataFinal = append(dataFinal, dataClusterPodMonitorings) dataFinal = append(dataFinal, dataBoth) @@ -1225,27 +1256,100 @@ func TestUpdateTargetStatus(t *testing.T) { return err.Error() == "unknown scrape kind \"unknown\"" }, }, + // No targets, with PodMonitoring config. + { + desc: "no-targets-no-match", + podMonitorings: []monitoringv1.PodMonitoring{ + { + ObjectMeta: metav1.ObjectMeta{Name: "prom-example-1", Namespace: "gmp-test"}, + Spec: monitoringv1.PodMonitoringSpec{}, + + Status: monitoringv1.PodMonitoringStatus{ + MonitoringStatus: monitoringv1.MonitoringStatus{ + ObservedGeneration: 2, + Conditions: []monitoringv1.MonitoringCondition{{ + Type: monitoringv1.ConfigurationCreateSuccess, + Status: corev1.ConditionTrue, + LastUpdateTime: metav1.Time{}, + LastTransitionTime: metav1.Time{}, + Reason: "", + Message: "", + }}, + }, + }, + }, + }, + initializeStatus: []monitoringv1.PodMonitoringStatus{ + { + MonitoringStatus: monitoringv1.MonitoringStatus{ + ObservedGeneration: 2, + Conditions: []monitoringv1.MonitoringCondition{{ + Type: monitoringv1.ConfigurationCreateSuccess, + Status: corev1.ConditionTrue, + LastUpdateTime: metav1.Time{}, + LastTransitionTime: metav1.Time{}, + Reason: "", + Message: "", + }}, + }, + EndpointStatuses: []monitoringv1.ScrapeEndpointStatus{ + { + Name: "PodMonitoring/gmp-test/prom-example-1/metrics", + ActiveTargets: 1, + UnhealthyTargets: 0, + LastUpdateTime: date, + SampleGroups: []monitoringv1.SampleGroup{ + { + SampleTargets: []monitoringv1.SampleTarget{ + { + Health: "up", + Labels: map[model.LabelName]model.LabelValue{ + "instance": "a", + }, + LastScrapeDurationSeconds: "1.2", + }, + }, + Count: ptr.To(int32(1)), + }, + }, + CollectorsFraction: "1", + }, + }, + }, + }, + }, }) for _, testCase := range testCases { t.Run(fmt.Sprintf("target-status-conversion-%s", testCase.desc), func(t *testing.T) { clientBuilder := newFakeClientBuilder() - for _, podMonitoring := range testCase.podMonitorings { + for i, podMonitoring := range testCase.podMonitorings { pmCopy := podMonitoring.DeepCopy() - pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil + if len(testCase.initializeStatus) > 0 { + pmCopy.Status = testCase.initializeStatus[i] + } else { + pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil + } clientBuilder.WithObjects(pmCopy) } - for _, clusterPodMonitoring := range testCase.clusterPodMonitorings { + for i, clusterPodMonitoring := range testCase.clusterPodMonitorings { pmCopy := clusterPodMonitoring.DeepCopy() - pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil + if len(testCase.initializeClusterStatus) > 0 { + pmCopy.Status = testCase.initializeClusterStatus[i] + } else { + pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil + } clientBuilder.WithObjects(pmCopy) } kubeClient := clientBuilder.Build() - err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets) + // fetchTargets(ctx, logger, opts, nil, targetFetchFromMap(prometheusTargetMap), kubeClient) + err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets, testCase.getPodMonitoringCRDs()) if err != nil && (testCase.expErr == nil || !testCase.expErr(err)) { t.Fatalf("unexpected error updating target status: %s", err) + } else if err == nil && (testCase.expErr != nil) { + t.Fatalf("expected error missing when updating target status") } for _, podMonitoring := range testCase.podMonitorings { @@ -1602,24 +1706,6 @@ func TestShouldPoll(t *testing.T) { should: false, expErr: true, }, - { - desc: "should not poll targets - no podmonitorings", - objs: []client.Object{ - &monitoringv1.OperatorConfig{ - ObjectMeta: metav1.ObjectMeta{ - Name: "config", - Namespace: "gmp-public", - }, - Features: monitoringv1.OperatorFeatures{ - TargetStatus: monitoringv1.TargetStatusSpec{ - Enabled: true, - }, - }, - }, - }, - should: false, - expErr: false, - }, { desc: "should not poll targets - disabled", objs: []client.Object{