From 367f1f4a7286a4cff863ac856d0f305201e38f0e Mon Sep 17 00:00:00 2001 From: Tiffany Pei Date: Wed, 12 Jun 2024 20:13:07 +0000 Subject: [PATCH] [metric] Reset declared_resource to 0 when commit updates with test fix (#1232) (#1254) * [metric] #3 Reset declared_resource to 0 when commit updates (#1232) The current OpenCensus library keeps a metric stream that is uniquely identified by commit alive even though no new values are issued. This makes the Otel Collector metricstransform processor receives all time value when calculating the max and gives false output. The temporary work around is to reset the stream of previous commit to 0 when parser sees new commit. b/321875474 * Use default timeout for metric value check (#1264) * test: Only check declared_resources metric (#1268) The fix https://github.com/GoogleContainerTools/kpt-config-sync/pull/1232 only affects declared_resources from Config Sync. The metric from RG controller may take longer to be ready. Skip checking for those target values for now. --- e2e/testcases/otel_collector_test.go | 60 ++++++++++++++++++++++++- pkg/declared/resources.go | 65 ++++++++++++++-------------- 2 files changed, 91 insertions(+), 34 deletions(-) diff --git a/e2e/testcases/otel_collector_test.go b/e2e/testcases/otel_collector_test.go index 13b6c0efba..4393922d7b 100644 --- a/e2e/testcases/otel_collector_test.go +++ b/e2e/testcases/otel_collector_test.go @@ -258,7 +258,7 @@ func TestGCMMetrics(t *testing.T) { if err != nil { nt.T.Fatal(err) } - _, err = retry.Retry(60*time.Second, func() error { + _, err = retry.Retry(120*time.Second, func() error { var err error for _, metricType := range GCMMetricTypes { descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType) @@ -270,6 +270,41 @@ func TestGCMMetrics(t *testing.T) { if err != nil { nt.T.Fatal(err) } + + nt.T.Log("Adding test namespace") + namespace := fake.NamespaceObject("foo") + nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace)) + nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace")) + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } + + nt.T.Log("Checking resource related metrics after adding test resource") + _, err = retry.Retry(nt.DefaultWaitTimeout, func() error { + descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, csmetrics.DeclaredResourcesName) + it := listMetricInGCM(ctx, nt, client, startTime, descriptor) + return validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasValue(3)) + }) + if err != nil { + nt.T.Fatal(err) + } + + nt.T.Log("Remove the test resource") + nt.Must(nt.RootRepos[configsync.RootSyncName].Remove("acme/ns.yaml")) + nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Remove the test namespace")) + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } + + nt.T.Log("Checking resource related metrics after removing test resource") + _, err = retry.Retry(nt.DefaultWaitTimeout, func() error { + descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, csmetrics.DeclaredResourcesName) + it := listMetricInGCM(ctx, nt, client, startTime, descriptor) + return validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasLatestValue(2)) + }) + if err != nil { + nt.T.Fatal(err) + } } // TestOtelCollectorGCMLabelAggregation validates that Google Cloud Monitoring @@ -451,6 +486,29 @@ func metricDoesNotHaveLabel(label string) metricValidatorFunc { } } +func metricHasValue(value int64) metricValidatorFunc { + return func(series *monitoringpb.TimeSeries) error { + points := series.GetPoints() + for _, point := range points { + if point.GetValue().GetInt64Value() == value { + return nil + } + } + return fmt.Errorf("expected metric to have value %v but did not find in response", value) + } +} + +func metricHasLatestValue(value int64) metricValidatorFunc { + return func(series *monitoringpb.TimeSeries) error { + points := series.GetPoints() + lastPoint := points[len(points)-1] + if lastPoint.GetValue().GetInt64Value() == value { + return nil + } + return fmt.Errorf("expected metric to have latest value %v but did not find in response", value) + } +} + // Validates a metricType from a specific cluster_name can be found within given // TimeSeries func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string, valFns ...metricValidatorFunc) error { diff --git a/pkg/declared/resources.go b/pkg/declared/resources.go index b331c2f3bc..4177e49996 100644 --- a/pkg/declared/resources.go +++ b/pkg/declared/resources.go @@ -41,10 +41,14 @@ type Resources struct { objectSet map[core.ID]*unstructured.Unstructured // commit of the source in which the resources were declared commit string + // previousCommit is the preceding commit to the commit + previousCommit string } // Update performs an atomic update on the resource declaration set. func (r *Resources) Update(ctx context.Context, objects []client.Object, commit string) ([]client.Object, status.Error) { + r.mutex.Lock() + defer r.mutex.Unlock() // First build up the new map using a local pointer/reference. newSet := make(map[core.ID]*unstructured.Unstructured) newObjects := []client.Object{} @@ -67,83 +71,78 @@ func (r *Resources) Update(ctx context.Context, objects []client.Object, commit // Record the declared_resources metric, after parsing but before validation. metrics.RecordDeclaredResources(ctx, commit, len(newObjects)) + if r.previousCommit != commit && r.previousCommit != "" { + // For Cloud Monitoring, we have configured otel-collector to remove the + // commit label, to reduce cardinality, but this aggregation uses the max + // value (b/321875474). So in order for the latest commit to be chosen as + // the max value, we reset the previous commit value to zero. + // TODO: Remove this workaround after migrating to the otel-collector metrics client and switching from gauge to async gauge + metrics.RecordDeclaredResources(ctx, r.previousCommit, 0) + } - previousSet, _ := r.getObjectSet() - if err := deletesAllNamespaces(previousSet, newSet); err != nil { + if err := deletesAllNamespaces(r.objectSet, newSet); err != nil { return nil, err } - // Now assign the pointer for the new map to the struct reference in a - // threadsafe context. From now on, this map is read-only. - r.setObjectSet(newSet, commit) + r.previousCommit = commit + r.objectSet = newSet + r.commit = commit return newObjects, nil } // Get returns a copy of the resource declaration as read from Git func (r *Resources) Get(id core.ID) (*unstructured.Unstructured, string, bool) { - objSet, commit := r.getObjectSet() - - // A local reference to the map is threadsafe since only the struct reference - // is replaced on update. - u, found := objSet[id] + r.mutex.RLock() + defer r.mutex.RUnlock() + u, found := r.objectSet[id] // We return a copy of the Unstructured, as // 1) client.Client methods mutate the objects passed into them. // 2) We don't want to persist any changes made to an object we retrieved // from a declared.Resources. - return u.DeepCopy(), commit, found + return u.DeepCopy(), r.commit, found } // DeclaredUnstructureds returns all resource objects declared in the source, // along with the source commit. func (r *Resources) DeclaredUnstructureds() ([]*unstructured.Unstructured, string) { + r.mutex.RLock() + defer r.mutex.RUnlock() var objects []*unstructured.Unstructured - objSet, commit := r.getObjectSet() // A local reference to the map is threadsafe since only the struct reference // is replaced on update. - for _, obj := range objSet { + for _, obj := range r.objectSet { objects = append(objects, obj) } - return objects, commit + return objects, r.commit } // DeclaredObjects returns all resource objects declared in the source, along // with the source commit. func (r *Resources) DeclaredObjects() ([]client.Object, string) { + r.mutex.RLock() + defer r.mutex.RUnlock() var objects []client.Object - objSet, commit := r.getObjectSet() // A local reference to the map is threadsafe since only the struct reference // is replaced on update. - for _, obj := range objSet { + for _, obj := range r.objectSet { objects = append(objects, obj) } - return objects, commit + return objects, r.commit } // DeclaredGVKs returns the set of all GroupVersionKind found in the source, // along with the source commit. func (r *Resources) DeclaredGVKs() (map[schema.GroupVersionKind]struct{}, string) { + r.mutex.RLock() + defer r.mutex.RUnlock() gvkSet := make(map[schema.GroupVersionKind]struct{}) - objSet, commit := r.getObjectSet() // A local reference to the objSet map is threadsafe since only the pointer to // the map is replaced on update. - for _, obj := range objSet { + for _, obj := range r.objectSet { gvkSet[obj.GroupVersionKind()] = struct{}{} } - return gvkSet, commit -} - -func (r *Resources) getObjectSet() (map[core.ID]*unstructured.Unstructured, string) { - r.mutex.RLock() - defer r.mutex.RUnlock() - return r.objectSet, r.commit -} - -func (r *Resources) setObjectSet(objectSet map[core.ID]*unstructured.Unstructured, commit string) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.objectSet = objectSet - r.commit = commit + return gvkSet, r.commit }