diff --git a/cmd/binding-apiserver/app/binding_apiserver.go b/cmd/binding-apiserver/app/binding_apiserver.go index 1a0930901..bf0fbb5b2 100644 --- a/cmd/binding-apiserver/app/binding_apiserver.go +++ b/cmd/binding-apiserver/app/binding_apiserver.go @@ -19,6 +19,7 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned" "github.com/clusterpedia-io/clusterpedia/pkg/storage" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro" clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" "github.com/clusterpedia-io/clusterpedia/pkg/version/verflag" ) @@ -56,7 +57,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command { return err } - synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, nil) + synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, clustersynchro.ClusterSyncConfig{}) go synchromanager.Run(1, ctx.Done()) server, err := completedConfig.New() diff --git a/cmd/clustersynchro-manager/app/config/config.go b/cmd/clustersynchro-manager/app/config/config.go index 3d1baa0d8..003be8164 100644 --- a/cmd/clustersynchro-manager/app/config/config.go +++ b/cmd/clustersynchro-manager/app/config/config.go @@ -9,6 +9,7 @@ import ( kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics" metrics "github.com/clusterpedia-io/clusterpedia/pkg/metrics" "github.com/clusterpedia-io/clusterpedia/pkg/storage" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro" ) type Config struct { @@ -19,8 +20,8 @@ type Config struct { WorkerNumber int MetricsServerConfig metrics.Config KubeMetricsServerConfig *kubestatemetrics.ServerConfig - MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder StorageFactory storage.StorageFactory + ClusterSyncConfig clustersynchro.ClusterSyncConfig LeaderElection componentbaseconfig.LeaderElectionConfiguration ClientConnection componentbaseconfig.ClientConnectionConfiguration diff --git a/cmd/clustersynchro-manager/app/options/options.go b/cmd/clustersynchro-manager/app/options/options.go index 95849694b..1e19109db 100644 --- a/cmd/clustersynchro-manager/app/options/options.go +++ b/cmd/clustersynchro-manager/app/options/options.go @@ -26,6 +26,7 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/metrics" "github.com/clusterpedia-io/clusterpedia/pkg/storage" storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro" ) const ( @@ -44,7 +45,8 @@ type Options struct { Metrics *metrics.Options KubeStateMetrics *kubestatemetrics.Options - WorkerNumber int // WorkerNumber is the number of worker goroutines + WorkerNumber int // WorkerNumber is the number of worker goroutines + PageSizeForResourceSync int64 } func NewClusterSynchroManagerOptions() (*Options, error) { @@ -89,6 +91,9 @@ func (o *Options) Flags() cliflag.NamedFlagSets { genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.") genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.") + syncfs := fss.FlagSet("resource sync") + syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync") + options.BindLeaderElectionFlags(&o.LeaderElection, genericfs) fs := fss.FlagSet("misc") @@ -165,7 +170,11 @@ func (o *Options) Config() (*config.Config, error) { MetricsServerConfig: metricsConfig, KubeMetricsServerConfig: kubeStateMetricsServerConfig, - MetricsStoreBuilder: metricsStoreBuilder, + + ClusterSyncConfig: clustersynchro.ClusterSyncConfig{ + MetricsStoreBuilder: metricsStoreBuilder, + PageSizeForResourceSync: o.PageSizeForResourceSync, + }, LeaderElection: o.LeaderElection, }, nil diff --git a/cmd/clustersynchro-manager/app/synchro.go b/cmd/clustersynchro-manager/app/synchro.go index d28b620c7..67ee102c4 100644 --- a/cmd/clustersynchro-manager/app/synchro.go +++ b/cmd/clustersynchro-manager/app/synchro.go @@ -85,7 +85,7 @@ func NewClusterSynchroManagerCommand(ctx context.Context) *cobra.Command { } func Run(ctx context.Context, c *config.Config) error { - synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.MetricsStoreBuilder) + synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.ClusterSyncConfig) go func() { metrics.RunServer(c.MetricsServerConfig) diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index e15974b45..527c7a259 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -25,6 +25,11 @@ import ( clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) +type ClusterSyncConfig struct { + MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder + PageSizeForResourceSync int64 +} + type ClusterSynchro struct { name string @@ -32,7 +37,7 @@ type ClusterSynchro struct { ClusterStatusUpdater ClusterStatusUpdater storage storage.StorageFactory - metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder + syncConfig ClusterSyncConfig healthChecker *healthChecker dynamicDiscovery discovery.DynamicDiscoveryInterface listerWatcherFactory informer.DynamicListerWatcherFactory @@ -69,7 +74,7 @@ type ClusterStatusUpdater interface { type RetryableError error -func New(name string, config *rest.Config, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder, updater ClusterStatusUpdater) (*ClusterSynchro, error) { +func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater, syncConfig ClusterSyncConfig) (*ClusterSynchro, error) { dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config) if err != nil { return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err)) @@ -103,6 +108,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri ClusterStatusUpdater: updater, storage: storage, + syncConfig: syncConfig, healthChecker: healthChecker, dynamicDiscovery: dynamicDiscovery, listerWatcherFactory: listWatchFactory, @@ -115,8 +121,6 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri stopRunnerCh: make(chan struct{}), storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}), - - metricsStoreBuilder: metricsStoreBuilder, } var refresherOnce sync.Once @@ -352,18 +356,20 @@ func (s *ClusterSynchro) refreshSyncResources() { } var metricsStore *kubestatemetrics.MetricsStore - if s.metricsStoreBuilder != nil { - metricsStore = s.metricsStoreBuilder.GetMetricStore(s.name, config.syncResource) + if s.syncConfig.MetricsStoreBuilder != nil { + metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource) } - synchro := newResourceSynchro( - s.name, - config.syncResource, - config.kind, - s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource), - rvs, - config.convertor, - resourceStorage, - metricsStore, + synchro := newResourceSynchro(s.name, + ResourceSynchroConfig{ + GroupVersionResource: config.syncResource, + Kind: config.kind, + ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource), + ObjectConvertor: config.convertor, + ResourceStorage: resourceStorage, + MetricsStore: metricsStore, + ResourceVersions: rvs, + PageSizeForInformer: s.syncConfig.PageSizeForResourceSync, + }, ) s.waitGroup.StartWithChannel(s.closer, synchro.Run) s.storageResourceSynchros.Store(storageGVR, synchro) diff --git a/pkg/synchromanager/clustersynchro/informer/named_controller.go b/pkg/synchromanager/clustersynchro/informer/named_controller.go index bf6407049..624c57e70 100644 --- a/pkg/synchromanager/clustersynchro/informer/named_controller.go +++ b/pkg/synchromanager/clustersynchro/informer/named_controller.go @@ -49,6 +49,9 @@ type Config struct { // WatchListPageSize is the requested chunk size of initial and relist watch lists. WatchListPageSize int64 + + ForcePaginatedList bool + StreamHandleForPaginatedList bool } type controller struct { @@ -86,6 +89,8 @@ func (c *controller) Run(stopCh <-chan struct{}) { } r.ShouldResync = c.config.ShouldResync r.WatchListPageSize = c.config.WatchListPageSize + r.ForcePaginatedList = c.config.ForcePaginatedList + r.StreamHandleForPaginatedList = c.config.StreamHandleForPaginatedList c.reflectorMutex.Lock() c.reflector = r diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go index ce5ade459..90a460279 100644 --- a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go +++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go @@ -19,7 +19,22 @@ type resourceVersionInformer struct { listerWatcher cache.ListerWatcher } -func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *ResourceVersionStorage, exampleObject runtime.Object, handler ResourceEventHandler, errorHandler WatchErrorHandler, extraStore ExtraStore) ResourceVersionInformer { +type InformerConfig struct { + cache.ListerWatcher + Storage *ResourceVersionStorage + + ExampleObject runtime.Object + Handler ResourceEventHandler + ErrorHandler WatchErrorHandler + ExtraStore ExtraStore + + // WatchListPageSize is the requested chunk size of initial and relist watch lists. + WatchListPageSize int64 + ForcePaginatedList bool + StreamHandleForPaginatedList bool +} + +func NewResourceVersionInformer(name string, config InformerConfig) ResourceVersionInformer { if name == "" { panic("name is required") } @@ -27,9 +42,9 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re // storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc), informer := &resourceVersionInformer{ name: name, - listerWatcher: lw, - storage: storage, - handler: handler, + listerWatcher: config.ListerWatcher, + storage: config.Storage, + handler: config.Handler, } var queue cache.Queue = cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{ @@ -37,22 +52,26 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re KnownObjects: informer.storage, EmitDeltaTypeReplaced: true, }) - if extraStore != nil { - queue = &queueWithExtraStore{Queue: queue, extra: extraStore} + if config.ExtraStore != nil { + queue = &queueWithExtraStore{Queue: queue, extra: config.ExtraStore} } - config := &Config{ - ListerWatcher: lw, - ObjectType: exampleObject, - RetryOnError: false, - Process: func(obj interface{}, isInInitialList bool) error { - deltas := obj.(cache.Deltas) - return informer.HandleDeltas(deltas, isInInitialList) + informer.controller = NewNamedController(informer.name, + &Config{ + ListerWatcher: config.ListerWatcher, + ObjectType: config.ExampleObject, + RetryOnError: false, + Process: func(obj interface{}, isInInitialList bool) error { + deltas := obj.(cache.Deltas) + return informer.HandleDeltas(deltas, isInInitialList) + }, + Queue: queue, + WatchErrorHandler: config.ErrorHandler, + WatchListPageSize: config.WatchListPageSize, + ForcePaginatedList: config.ForcePaginatedList, + StreamHandleForPaginatedList: config.StreamHandleForPaginatedList, }, - Queue: queue, - WatchErrorHandler: errorHandler, - } - informer.controller = NewNamedController(informer.name, config) + ) return informer } diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go index 0e0ade89f..6f8e406b0 100644 --- a/pkg/synchromanager/clustersynchro/resource_synchro.go +++ b/pkg/synchromanager/clustersynchro/resource_synchro.go @@ -28,6 +28,24 @@ import ( clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) +type ResourceSynchroConfig struct { + schema.GroupVersionResource + Kind string + + cache.ListerWatcher + runtime.ObjectConvertor + storage.ResourceStorage + + *kubestatemetrics.MetricsStore + + ResourceVersions map[string]interface{} + PageSizeForInformer int64 +} + +func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind { + return c.GroupVersionResource.GroupVersion().WithKind(c.Kind) +} + type ResourceSynchro struct { cluster string @@ -35,6 +53,7 @@ type ResourceSynchro struct { syncResource schema.GroupVersionResource storageResource schema.GroupVersionResource + pageSize int64 listerWatcher cache.ListerWatcher metricsExtraStore informer.ExtraStore metricsWriter *metricsstore.MetricsWriter @@ -69,23 +88,22 @@ type ResourceSynchro struct { runningStage string } -func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource, kind string, lw cache.ListerWatcher, rvs map[string]interface{}, - convertor runtime.ObjectConvertor, storage storage.ResourceStorage, metricsStore *kubestatemetrics.MetricsStore, -) *ResourceSynchro { - storageConfig := storage.GetStorageConfig() +func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro { + storageConfig := config.ResourceStorage.GetStorageConfig() synchro := &ResourceSynchro{ cluster: cluster, - syncResource: syncResource, + syncResource: config.GroupVersionResource, storageResource: storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version), - listerWatcher: lw, - rvs: rvs, + pageSize: config.PageSizeForInformer, + listerWatcher: config.ListerWatcher, + rvs: config.ResourceVersions, // all resources saved to the queue are `runtime.Object` queue: queue.NewPressureQueue(cache.MetaNamespaceKeyFunc), - storage: storage, - convertor: convertor, + storage: config.ResourceStorage, + convertor: config.ObjectConvertor, memoryVersion: storageConfig.MemoryVersion, stopped: make(chan struct{}), @@ -100,12 +118,12 @@ func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource synchro.ctx, synchro.cancel = context.WithCancel(context.Background()) example := &unstructured.Unstructured{} - example.SetGroupVersionKind(syncResource.GroupVersion().WithKind(kind)) + example.SetGroupVersionKind(config.GroupVersionKind()) synchro.example = example - if metricsStore != nil { - synchro.metricsExtraStore = metricsStore - synchro.metricsWriter = metricsstore.NewMetricsWriter(metricsStore.MetricsStore) + if config.MetricsStore != nil { + synchro.metricsExtraStore = config.MetricsStore + synchro.metricsWriter = metricsstore.NewMetricsWriter(config.MetricsStore.MetricsStore) } synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "") @@ -244,10 +262,22 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) { synchro.rvsLock.Unlock() } - informer.NewResourceVersionInformer( - synchro.cluster, synchro.listerWatcher, synchro.cache, - synchro.example, synchro, synchro.ErrorHandler, synchro.metricsExtraStore, - ).Run(informerStopCh) + config := informer.InformerConfig{ + ListerWatcher: synchro.listerWatcher, + Storage: synchro.cache, + ExampleObject: synchro.example, + Handler: synchro, + ErrorHandler: synchro.ErrorHandler, + ExtraStore: synchro.metricsExtraStore, + WatchListPageSize: synchro.pageSize, + } + if clusterpediafeature.FeatureGate.Enabled(features.StreamHandlePaginatedListForResourceSync) { + config.StreamHandleForPaginatedList = true + } + if clusterpediafeature.FeatureGate.Enabled(features.ForcePaginatedListForResourceSync) { + config.ForcePaginatedList = true + } + informer.NewResourceVersionInformer(synchro.cluster, config).Run(informerStopCh) // TODO(Iceber): Optimize status updates in case of storage exceptions if !synchro.isRunnableForStorage.Load() { diff --git a/pkg/synchromanager/clustersynchro_manager.go b/pkg/synchromanager/clustersynchro_manager.go index 00aef44fe..bffde8a8f 100644 --- a/pkg/synchromanager/clustersynchro_manager.go +++ b/pkg/synchromanager/clustersynchro_manager.go @@ -50,19 +50,19 @@ type Manager struct { queue workqueue.RateLimitingInterface storage storage.StorageFactory - metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder clusterlister clusterlister.PediaClusterLister clusterSyncResourcesLister clusterlister.ClusterSyncResourcesLister clusterInformer cache.SharedIndexInformer - synchrolock sync.RWMutex - synchros map[string]*clustersynchro.ClusterSynchro - synchroWaitGroup wait.Group + clusterSyncConfig clustersynchro.ClusterSyncConfig + synchrolock sync.RWMutex + synchros map[string]*clustersynchro.ClusterSynchro + synchroWaitGroup wait.Group } var _ kubestatemetrics.ClusterMetricsWriterListGetter = &Manager{} -func NewManager(client crdclientset.Interface, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder) *Manager { +func NewManager(client crdclientset.Interface, storage storage.StorageFactory, syncConfig clustersynchro.ClusterSyncConfig) *Manager { factory := externalversions.NewSharedInformerFactory(client, 0) clusterinformer := factory.Cluster().V1alpha2().PediaClusters() clusterSyncResourcesInformer := factory.Cluster().V1alpha2().ClusterSyncResources() @@ -72,7 +72,6 @@ func NewManager(client crdclientset.Interface, storage storage.StorageFactory, m clusterpediaclient: client, storage: storage, - metricsStoreBuilder: metricsStoreBuilder, clusterlister: clusterinformer.Lister(), clusterInformer: clusterinformer.Informer(), clusterSyncResourcesLister: clusterSyncResourcesInformer.Lister(), @@ -80,7 +79,8 @@ func NewManager(client crdclientset.Interface, storage storage.StorageFactory, m NewItemExponentialFailureAndJitterSlowRateLimter(2*time.Second, 15*time.Second, 1*time.Minute, 1.0, defaultRetryNum), ), - synchros: make(map[string]*clustersynchro.ClusterSynchro), + clusterSyncConfig: syncConfig, + synchros: make(map[string]*clustersynchro.ClusterSynchro), } if _, err := clusterinformer.Informer().AddEventHandler( @@ -348,7 +348,7 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster) // create resource synchro if synchro == nil { - synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager.metricsStoreBuilder, manager) + synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager, manager.clusterSyncConfig) if err != nil { _, forever := err.(clustersynchro.RetryableError) klog.ErrorS(err, "Failed to create cluster synchro", "cluster", cluster.Name) diff --git a/pkg/synchromanager/features/features.go b/pkg/synchromanager/features/features.go index e46f85d01..5efce4459 100644 --- a/pkg/synchromanager/features/features.go +++ b/pkg/synchromanager/features/features.go @@ -45,6 +45,10 @@ const ( // owner: @iceber // alpha: v0.6.0 HealthCheckerWithStandaloneTCP featuregate.Feature = "HealthCheckerWithStandaloneTCP" + + ForcePaginatedListForResourceSync featuregate.Feature = "ForcePaginatedListForResourceSync" + + StreamHandlePaginatedListForResourceSync featuregate.Feature = "StreamHandlePaginatedListForResourceSync" ) func init() { @@ -59,4 +63,7 @@ var defaultClusterSynchroManagerFeatureGates = map[featuregate.Feature]featurega AllowSyncAllCustomResources: {Default: false, PreRelease: featuregate.Alpha}, AllowSyncAllResources: {Default: false, PreRelease: featuregate.Alpha}, HealthCheckerWithStandaloneTCP: {Default: false, PreRelease: featuregate.Alpha}, + + ForcePaginatedListForResourceSync: {Default: false, PreRelease: featuregate.Alpha}, + StreamHandlePaginatedListForResourceSync: {Default: false, PreRelease: featuregate.Alpha}, }