diff --git a/e2e/testcases/otel_collector_test.go b/e2e/testcases/otel_collector_test.go index 13b6c0efba..4393922d7b 100644 --- a/e2e/testcases/otel_collector_test.go +++ b/e2e/testcases/otel_collector_test.go @@ -258,7 +258,7 @@ func TestGCMMetrics(t *testing.T) { if err != nil { nt.T.Fatal(err) } - _, err = retry.Retry(60*time.Second, func() error { + _, err = retry.Retry(120*time.Second, func() error { var err error for _, metricType := range GCMMetricTypes { descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, metricType) @@ -270,6 +270,41 @@ func TestGCMMetrics(t *testing.T) { if err != nil { nt.T.Fatal(err) } + + nt.T.Log("Adding test namespace") + namespace := fake.NamespaceObject("foo") + nt.Must(nt.RootRepos[configsync.RootSyncName].Add("acme/ns.yaml", namespace)) + nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding foo namespace")) + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } + + nt.T.Log("Checking resource related metrics after adding test resource") + _, err = retry.Retry(nt.DefaultWaitTimeout, func() error { + descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, csmetrics.DeclaredResourcesName) + it := listMetricInGCM(ctx, nt, client, startTime, descriptor) + return validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasValue(3)) + }) + if err != nil { + nt.T.Fatal(err) + } + + nt.T.Log("Remove the test resource") + nt.Must(nt.RootRepos[configsync.RootSyncName].Remove("acme/ns.yaml")) + nt.Must(nt.RootRepos[configsync.RootSyncName].CommitAndPush("Remove the test namespace")) + if err := nt.WatchForAllSyncs(); err != nil { + nt.T.Fatal(err) + } + + nt.T.Log("Checking resource related metrics after removing test resource") + _, err = retry.Retry(nt.DefaultWaitTimeout, func() error { + descriptor := fmt.Sprintf("%s/%s", GCMMetricPrefix, csmetrics.DeclaredResourcesName) + it := listMetricInGCM(ctx, nt, client, startTime, descriptor) + return validateMetricInGCM(nt, it, descriptor, nt.ClusterName, metricHasLatestValue(2)) + }) + if err != nil { + nt.T.Fatal(err) + } } // TestOtelCollectorGCMLabelAggregation validates that Google Cloud Monitoring @@ -451,6 +486,29 @@ func metricDoesNotHaveLabel(label string) metricValidatorFunc { } } +func metricHasValue(value int64) metricValidatorFunc { + return func(series *monitoringpb.TimeSeries) error { + points := series.GetPoints() + for _, point := range points { + if point.GetValue().GetInt64Value() == value { + return nil + } + } + return fmt.Errorf("expected metric to have value %v but did not find in response", value) + } +} + +func metricHasLatestValue(value int64) metricValidatorFunc { + return func(series *monitoringpb.TimeSeries) error { + points := series.GetPoints() + lastPoint := points[len(points)-1] + if lastPoint.GetValue().GetInt64Value() == value { + return nil + } + return fmt.Errorf("expected metric to have latest value %v but did not find in response", value) + } +} + // Validates a metricType from a specific cluster_name can be found within given // TimeSeries func validateMetricInGCM(nt *nomostest.NT, it *monitoringv2.TimeSeriesIterator, metricType, clusterName string, valFns ...metricValidatorFunc) error { diff --git a/pkg/declared/resources.go b/pkg/declared/resources.go index b331c2f3bc..4177e49996 100644 --- a/pkg/declared/resources.go +++ b/pkg/declared/resources.go @@ -41,10 +41,14 @@ type Resources struct { objectSet map[core.ID]*unstructured.Unstructured // commit of the source in which the resources were declared commit string + // previousCommit is the preceding commit to the commit + previousCommit string } // Update performs an atomic update on the resource declaration set. func (r *Resources) Update(ctx context.Context, objects []client.Object, commit string) ([]client.Object, status.Error) { + r.mutex.Lock() + defer r.mutex.Unlock() // First build up the new map using a local pointer/reference. newSet := make(map[core.ID]*unstructured.Unstructured) newObjects := []client.Object{} @@ -67,83 +71,78 @@ func (r *Resources) Update(ctx context.Context, objects []client.Object, commit // Record the declared_resources metric, after parsing but before validation. metrics.RecordDeclaredResources(ctx, commit, len(newObjects)) + if r.previousCommit != commit && r.previousCommit != "" { + // For Cloud Monitoring, we have configured otel-collector to remove the + // commit label, to reduce cardinality, but this aggregation uses the max + // value (b/321875474). So in order for the latest commit to be chosen as + // the max value, we reset the previous commit value to zero. + // TODO: Remove this workaround after migrating to the otel-collector metrics client and switching from gauge to async gauge + metrics.RecordDeclaredResources(ctx, r.previousCommit, 0) + } - previousSet, _ := r.getObjectSet() - if err := deletesAllNamespaces(previousSet, newSet); err != nil { + if err := deletesAllNamespaces(r.objectSet, newSet); err != nil { return nil, err } - // Now assign the pointer for the new map to the struct reference in a - // threadsafe context. From now on, this map is read-only. - r.setObjectSet(newSet, commit) + r.previousCommit = commit + r.objectSet = newSet + r.commit = commit return newObjects, nil } // Get returns a copy of the resource declaration as read from Git func (r *Resources) Get(id core.ID) (*unstructured.Unstructured, string, bool) { - objSet, commit := r.getObjectSet() - - // A local reference to the map is threadsafe since only the struct reference - // is replaced on update. - u, found := objSet[id] + r.mutex.RLock() + defer r.mutex.RUnlock() + u, found := r.objectSet[id] // We return a copy of the Unstructured, as // 1) client.Client methods mutate the objects passed into them. // 2) We don't want to persist any changes made to an object we retrieved // from a declared.Resources. - return u.DeepCopy(), commit, found + return u.DeepCopy(), r.commit, found } // DeclaredUnstructureds returns all resource objects declared in the source, // along with the source commit. func (r *Resources) DeclaredUnstructureds() ([]*unstructured.Unstructured, string) { + r.mutex.RLock() + defer r.mutex.RUnlock() var objects []*unstructured.Unstructured - objSet, commit := r.getObjectSet() // A local reference to the map is threadsafe since only the struct reference // is replaced on update. - for _, obj := range objSet { + for _, obj := range r.objectSet { objects = append(objects, obj) } - return objects, commit + return objects, r.commit } // DeclaredObjects returns all resource objects declared in the source, along // with the source commit. func (r *Resources) DeclaredObjects() ([]client.Object, string) { + r.mutex.RLock() + defer r.mutex.RUnlock() var objects []client.Object - objSet, commit := r.getObjectSet() // A local reference to the map is threadsafe since only the struct reference // is replaced on update. - for _, obj := range objSet { + for _, obj := range r.objectSet { objects = append(objects, obj) } - return objects, commit + return objects, r.commit } // DeclaredGVKs returns the set of all GroupVersionKind found in the source, // along with the source commit. func (r *Resources) DeclaredGVKs() (map[schema.GroupVersionKind]struct{}, string) { + r.mutex.RLock() + defer r.mutex.RUnlock() gvkSet := make(map[schema.GroupVersionKind]struct{}) - objSet, commit := r.getObjectSet() // A local reference to the objSet map is threadsafe since only the pointer to // the map is replaced on update. - for _, obj := range objSet { + for _, obj := range r.objectSet { gvkSet[obj.GroupVersionKind()] = struct{}{} } - return gvkSet, commit -} - -func (r *Resources) getObjectSet() (map[core.ID]*unstructured.Unstructured, string) { - r.mutex.RLock() - defer r.mutex.RUnlock() - return r.objectSet, r.commit -} - -func (r *Resources) setObjectSet(objectSet map[core.ID]*unstructured.Unstructured, commit string) { - r.mutex.Lock() - defer r.mutex.Unlock() - r.objectSet = objectSet - r.commit = commit + return gvkSet, r.commit }