Skip to content

Commit

Permalink
clustersynchro: add resource synchro metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Jan 2, 2025
1 parent fc6817d commit 1b50d4b
Show file tree
Hide file tree
Showing 6 changed files with 492 additions and 7 deletions.
48 changes: 48 additions & 0 deletions cmd/clustersynchro-manager/app/options/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package options

import (
"github.com/spf13/pflag"

metricsserver "github.com/clusterpedia-io/clusterpedia/pkg/metrics/server"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
)

type MetricsOptions struct {
Metrics *metricsserver.Options

ResourceSynchroMetricsLabels string
}

func NewMetricsOptions() *MetricsOptions {
return &MetricsOptions{
Metrics: metricsserver.NewOptions(),
ResourceSynchroMetricsLabels: resourcesynchro.DefaultMetricsWrapperFactory.Config().ScopeLabels(),
}
}

func (o *MetricsOptions) AddFlags(fs *pflag.FlagSet) {
o.Metrics.AddFlags(fs)

fs.StringVar(&o.ResourceSynchroMetricsLabels, "resource-synchro-metrics-labels", o.ResourceSynchroMetricsLabels, "The resource synchronizer's metrics aggregation scope, which supports 'empty', 'cluster', 'gv','gvr', 'cluster,gv', 'cluster,gvr', etc.")
}

func (o *MetricsOptions) Validate() []error {
errs := o.Metrics.Validate()

if _, err := resourcesynchro.ParseToMetricsWrapperConfig(o.ResourceSynchroMetricsLabels); err != nil {
errs = append(errs, err)
}
return errs
}

func (o *MetricsOptions) ServerConfig() metricsserver.Config {
return o.Metrics.Config()
}

func (o *MetricsOptions) ResourceSynchroConfig() resourcesynchro.MetricsWrapperConfig {
config, err := resourcesynchro.ParseToMetricsWrapperConfig(o.ResourceSynchroMetricsLabels)
if err != nil {
panic(err)
}
return config
}
12 changes: 8 additions & 4 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/clusterpedia-io/clusterpedia/cmd/clustersynchro-manager/app/config"
crdclientset "github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned"
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
metricsserver "github.com/clusterpedia-io/clusterpedia/pkg/metrics/server"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
)

const (
Expand All @@ -42,7 +42,7 @@ type Options struct {

Logs *logs.Options
Storage *storageoptions.StorageOptions
Metrics *metricsserver.Options
Metrics *MetricsOptions
KubeStateMetrics *kubestatemetrics.Options

WorkerNumber int // WorkerNumber is the number of worker goroutines
Expand Down Expand Up @@ -76,7 +76,7 @@ func NewClusterSynchroManagerOptions() (*Options, error) {

options.Logs = logs.NewOptions()
options.Storage = storageoptions.NewStorageOptions()
options.Metrics = metricsserver.NewOptions()
options.Metrics = NewMetricsOptions()
options.KubeStateMetrics = kubestatemetrics.NewOptions()

options.WorkerNumber = 5
Expand Down Expand Up @@ -155,13 +155,17 @@ func (o *Options) Config() (*config.Config, error) {
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
eventRecorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: ClusterSynchroManagerUserAgent})

metricsConfig := o.Metrics.Config()
metricsConfig := o.Metrics.ServerConfig()
metricsStoreBuilder, err := o.KubeStateMetrics.MetricsStoreBuilderConfig().New()
if err != nil {
return nil, err
}
kubeStateMetricsServerConfig := o.KubeStateMetrics.ServerConfig(metricsConfig)

if config := o.Metrics.ResourceSynchroConfig(); config != resourcesynchro.DefaultMetricsWrapperFactory.Config() {
resourcesynchro.DefaultMetricsWrapperFactory = resourcesynchro.NewMetricsWrapperFactory(config)
}

if o.ShardingName != "" {
o.LeaderElection.ResourceName = fmt.Sprintf("%s-%s", o.LeaderElection.ResourceName, o.ShardingName)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat
synchro.resourceSynchroFactory = factory
} else {
synchro.resourceSynchroFactory = DefaultResourceSynchroFactory{}
registerResourceSynchroMetrics()
}

var refresherOnce sync.Once
Expand Down
34 changes: 31 additions & 3 deletions pkg/synchromanager/clustersynchro/default_resource_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
genericstorage "k8s.io/apiserver/pkg/storage"
"k8s.io/client-go/tools/cache"
compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/klog/v2"
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"

Expand All @@ -39,6 +40,7 @@ type resourceSynchro struct {
listerWatcher cache.ListerWatcher
metricsExtraStore informer.ExtraStore
metricsWriter *metricsstore.MetricsWriter
metricsWrapper resourcesynchro.MetricsWrapper

queue queue.EventQueue
cache *informer.ResourceVersionStorage
Expand Down Expand Up @@ -68,6 +70,8 @@ type resourceSynchro struct {

// for debug
runningStage string

storageMaxRetry int
}

type DefaultResourceSynchroFactory struct{}
Expand All @@ -88,9 +92,10 @@ func (factory DefaultResourceSynchroFactory) NewResourceSynchro(cluster string,
// all resources saved to the queue are `runtime.Object`
queue: queue.NewPressureQueue(cache.MetaNamespaceKeyFunc),

storage: config.ResourceStorage,
convertor: config.ObjectConvertor,
memoryVersion: storageConfig.MemoryResource.GroupVersion(),
storage: config.ResourceStorage,
convertor: config.ObjectConvertor,
memoryVersion: storageConfig.MemoryResource.GroupVersion(),
metricsWrapper: resourcesynchro.DefaultMetricsWrapperFactory.NewWrapper(cluster, config.GroupVersionResource),

stopped: make(chan struct{}),
isRunnableForStorage: atomic.NewBool(true),
Expand Down Expand Up @@ -158,6 +163,10 @@ func (synchro *resourceSynchro) Run(shutdown <-chan struct{}) {

synchro.setStatus(clusterv1alpha2.ResourceSyncStatusStop, "", "")
synchro.runningStage = "shutdown"

for _, m := range resourceSynchroMetrics {
synchro.metricsWrapper.Delete(m.(resourcesynchro.DeletableVecMetrics))
}
}

func (synchro *resourceSynchro) Close() <-chan struct{} {
Expand Down Expand Up @@ -256,6 +265,7 @@ func (synchro *resourceSynchro) Start(stopCh <-chan struct{}) {
for r, v := range synchro.rvs {
rvs[r] = v
}
synchro.metricsWrapper.Sum(storagedResourcesTotal, float64(len(rvs)))
synchro.cache = informer.NewResourceVersionStorage()
synchro.rvsLock.Unlock()

Expand Down Expand Up @@ -403,45 +413,63 @@ func (synchro *resourceSynchro) handleResourceEvent(event *queue.Event) {
}
utils.InjectClusterName(obj, synchro.cluster)

var metric compbasemetrics.CounterMetric
switch event.Action {
case queue.Added:
handler = synchro.createOrUpdateResource
metric = synchro.metricsWrapper.Counter(resourceAddedCounter)
case queue.Updated:
handler = synchro.updateOrCreateResource
metric = synchro.metricsWrapper.Counter(resourceUpdatedCounter)
}
callback = func(obj runtime.Object) {
metric.Inc()
metaobj, _ := meta.Accessor(obj)
synchro.rvsLock.Lock()
synchro.rvs[key] = metaobj.GetResourceVersion()

synchro.metricsWrapper.Sum(storagedResourcesTotal, float64(len(synchro.rvs)))
synchro.rvsLock.Unlock()
}
} else {
handler, callback = synchro.deleteResource, func(_ runtime.Object) {
synchro.rvsLock.Lock()
delete(synchro.rvs, key)
synchro.metricsWrapper.Sum(storagedResourcesTotal, float64(len(synchro.rvs)))
synchro.rvsLock.Unlock()
synchro.metricsWrapper.Counter(resourceDeletedCounter).Inc()
}
}

// TODO(Iceber): put the event back into the queue to retry?
for i := 0; ; i++ {
now := time.Now()
ctx, cancel := context.WithTimeout(synchro.ctx, 30*time.Second)
err := handler(ctx, obj)
cancel()
if err == nil {
callback(obj)

if i != 0 && i > synchro.storageMaxRetry {
synchro.storageMaxRetry = i
synchro.metricsWrapper.Max(resourceMaxRetryGauge, float64(i))
}

if !synchro.isRunnableForStorage.Load() && synchro.queue.Len() == 0 {
// Start the informer after processing the data in the queue to ensure that storage is up and running for a period of time.
synchro.setRunnableForStorage()
}
synchro.metricsWrapper.Historgram(resourceStorageDuration).Observe(time.Since(now).Seconds())
return
}

if errors.Is(err, context.Canceled) {
return
}

synchro.metricsWrapper.Counter(resourceFailedCounter).Inc()
if !storage.IsRecoverableException(err) {
synchro.metricsWrapper.Counter(resourceDroppedCounter).Inc()
klog.ErrorS(err, "Failed to storage resource", "cluster", synchro.cluster,
"action", event.Action, "resource", synchro.storageResource, "key", key)

Expand Down
156 changes: 156 additions & 0 deletions pkg/synchromanager/clustersynchro/default_resource_synchro_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package clustersynchro

import (
"sync"

compbasemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"

"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
)

// Note: The current design pattern does not account for the presence of multiple different types of resource synchronizers simultaneously.
// Future updates can be made based on requirements.

const (
namespace = "clustersynchro"
subsystem = "resourcesynchro"
)

var (
// storagedResourcesTotal records the total number of resources stored in the storage layer.
storagedResourcesTotal *compbasemetrics.GaugeVec

// resourceAddedCounter records the number of times resources are added to the storage layer.
resourceAddedCounter *compbasemetrics.CounterVec

// resourceUpdatedCounter records the number of times resources are updated in the storage layer.
resourceUpdatedCounter *compbasemetrics.CounterVec

// resourceDeletedCounter records the number of times resources are deleted from the storage layer.
resourceDeletedCounter *compbasemetrics.CounterVec

// resourceFailedCounter records the number of times resource operations fail.
resourceFailedCounter *compbasemetrics.CounterVec

// resourceDroppedCounter records the number of times resources are dropped.
resourceDroppedCounter *compbasemetrics.CounterVec

// resourceMaxRetryGauge provides the maximum number of retries during resource operations.
resourceMaxRetryGauge *compbasemetrics.GaugeVec

// resourceStorageDuration records the time interval from when a resource is fetched from the queue to when it is processed.
resourceStorageDuration *compbasemetrics.HistogramVec
)

var resourceSynchroMetrics = []interface{}{
storagedResourcesTotal,
resourceAddedCounter,
resourceUpdatedCounter,
resourceDeletedCounter,
resourceFailedCounter,
resourceMaxRetryGauge,
resourceDroppedCounter,
resourceStorageDuration,
}

var registerOnce sync.Once

func registerResourceSynchroMetrics() {
registerOnce.Do(func() {
storagedResourcesTotal = resourcesynchro.DefaultMetricsWrapperFactory.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "storaged_resource_total",
Help: "Number of resources stored in the storage layer.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceAddedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resource_added_total",
Help: "Number of times resources are deleted from the storage layer.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceUpdatedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resource_updated_total",
Help: "Number of times resources are updated in the storage layer.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceDeletedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resource_deleted_total",
Help: "Number of times resources are deleted from the storage layer.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceFailedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resource_failed_total",
Help: "Number of times resource operations fail.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceMaxRetryGauge = resourcesynchro.DefaultMetricsWrapperFactory.NewGaugeVec(
&compbasemetrics.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "resource_max_retry_total",
Help: "The maximum number of retries during resource operations.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceDroppedCounter = resourcesynchro.DefaultMetricsWrapperFactory.NewCounterVec(
&compbasemetrics.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "failed_resource_total",
Help: "Number of times resources are dropped.",
StabilityLevel: compbasemetrics.ALPHA,
},
)

resourceStorageDuration = resourcesynchro.DefaultMetricsWrapperFactory.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "storage_duration_seconds",
Help: "The time interval from when a resource is fetched from the queue to when it is processed.",
StabilityLevel: compbasemetrics.ALPHA,
Buckets: []float64{0.025, 0.05, 0.1, 0.2, 0.4, 0.6, 1.0, 1.5, 3, 5, 8, 15},
},
)

resourceSynchroMetrics = []interface{}{
storagedResourcesTotal,
resourceAddedCounter,
resourceUpdatedCounter,
resourceDeletedCounter,
resourceFailedCounter,
resourceMaxRetryGauge,
resourceDroppedCounter,
resourceStorageDuration,
}
for _, m := range resourceSynchroMetrics {
legacyregistry.MustRegister(m.(compbasemetrics.Registerable))
}
})
}
Loading

0 comments on commit 1b50d4b

Please sign in to comment.