From bd4eeff88d904ef18210fcbf6b3495d043de1d67 Mon Sep 17 00:00:00 2001
From: Iceber Gu <caiwei95@hotmail.com>
Date: Wed, 15 Nov 2023 13:59:32 +0800
Subject: [PATCH] add FeatureGates and page-size flags about paginated list

Signed-off-by: Iceber Gu <caiwei95@hotmail.com>
---
 .../app/config/config.go                      |  3 +-
 .../app/options/options.go                    | 13 +++-
 cmd/clustersynchro-manager/app/synchro.go     |  2 +-
 .../clustersynchro/cluster_synchro.go         | 36 ++++++-----
 .../informer/named_controller.go              |  5 ++
 .../informer/resourceversion_informer.go      | 53 ++++++++++-----
 .../clustersynchro/resource_synchro.go        | 64 ++++++++++++++-----
 pkg/synchromanager/clustersynchro_manager.go  | 16 ++---
 pkg/synchromanager/features/features.go       |  7 ++
 9 files changed, 138 insertions(+), 61 deletions(-)

diff --git a/cmd/clustersynchro-manager/app/config/config.go b/cmd/clustersynchro-manager/app/config/config.go
index 3d1baa0d8..003be8164 100644
--- a/cmd/clustersynchro-manager/app/config/config.go
+++ b/cmd/clustersynchro-manager/app/config/config.go
@@ -9,6 +9,7 @@ import (
 	kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
 	metrics "github.com/clusterpedia-io/clusterpedia/pkg/metrics"
 	"github.com/clusterpedia-io/clusterpedia/pkg/storage"
+	"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
 )
 
 type Config struct {
@@ -19,8 +20,8 @@ type Config struct {
 	WorkerNumber            int
 	MetricsServerConfig     metrics.Config
 	KubeMetricsServerConfig *kubestatemetrics.ServerConfig
-	MetricsStoreBuilder     *kubestatemetrics.MetricsStoreBuilder
 	StorageFactory          storage.StorageFactory
+	ClusterSyncConfig       clustersynchro.ClusterSyncConfig
 
 	LeaderElection   componentbaseconfig.LeaderElectionConfiguration
 	ClientConnection componentbaseconfig.ClientConnectionConfiguration
diff --git a/cmd/clustersynchro-manager/app/options/options.go b/cmd/clustersynchro-manager/app/options/options.go
index 95849694b..1e19109db 100644
--- a/cmd/clustersynchro-manager/app/options/options.go
+++ b/cmd/clustersynchro-manager/app/options/options.go
@@ -26,6 +26,7 @@ import (
 	"github.com/clusterpedia-io/clusterpedia/pkg/metrics"
 	"github.com/clusterpedia-io/clusterpedia/pkg/storage"
 	storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
+	"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
 )
 
 const (
@@ -44,7 +45,8 @@ type Options struct {
 	Metrics          *metrics.Options
 	KubeStateMetrics *kubestatemetrics.Options
 
-	WorkerNumber int // WorkerNumber is the number of worker goroutines
+	WorkerNumber            int // WorkerNumber is the number of worker goroutines
+	PageSizeForResourceSync int64
 }
 
 func NewClusterSynchroManagerOptions() (*Options, error) {
@@ -89,6 +91,9 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
 	genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
 	genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")
 
+	syncfs := fss.FlagSet("resource sync")
+	syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync")
+
 	options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)
 
 	fs := fss.FlagSet("misc")
@@ -165,7 +170,11 @@ func (o *Options) Config() (*config.Config, error) {
 
 		MetricsServerConfig:     metricsConfig,
 		KubeMetricsServerConfig: kubeStateMetricsServerConfig,
-		MetricsStoreBuilder:     metricsStoreBuilder,
+
+		ClusterSyncConfig: clustersynchro.ClusterSyncConfig{
+			MetricsStoreBuilder:     metricsStoreBuilder,
+			PageSizeForResourceSync: o.PageSizeForResourceSync,
+		},
 
 		LeaderElection: o.LeaderElection,
 	}, nil
diff --git a/cmd/clustersynchro-manager/app/synchro.go b/cmd/clustersynchro-manager/app/synchro.go
index d28b620c7..67ee102c4 100644
--- a/cmd/clustersynchro-manager/app/synchro.go
+++ b/cmd/clustersynchro-manager/app/synchro.go
@@ -85,7 +85,7 @@ func NewClusterSynchroManagerCommand(ctx context.Context) *cobra.Command {
 }
 
 func Run(ctx context.Context, c *config.Config) error {
-	synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.MetricsStoreBuilder)
+	synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.ClusterSyncConfig)
 
 	go func() {
 		metrics.RunServer(c.MetricsServerConfig)
diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go
index e15974b45..527c7a259 100644
--- a/pkg/synchromanager/clustersynchro/cluster_synchro.go
+++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go
@@ -25,6 +25,11 @@ import (
 	clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
 )
 
+type ClusterSyncConfig struct {
+	MetricsStoreBuilder     *kubestatemetrics.MetricsStoreBuilder
+	PageSizeForResourceSync int64
+}
+
 type ClusterSynchro struct {
 	name string
 
@@ -32,7 +37,7 @@ type ClusterSynchro struct {
 	ClusterStatusUpdater ClusterStatusUpdater
 
 	storage              storage.StorageFactory
-	metricsStoreBuilder  *kubestatemetrics.MetricsStoreBuilder
+	syncConfig           ClusterSyncConfig
 	healthChecker        *healthChecker
 	dynamicDiscovery     discovery.DynamicDiscoveryInterface
 	listerWatcherFactory informer.DynamicListerWatcherFactory
@@ -69,7 +74,7 @@ type ClusterStatusUpdater interface {
 
 type RetryableError error
 
-func New(name string, config *rest.Config, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder, updater ClusterStatusUpdater) (*ClusterSynchro, error) {
+func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater, syncConfig ClusterSyncConfig) (*ClusterSynchro, error) {
 	dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config)
 	if err != nil {
 		return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err))
@@ -103,6 +108,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
 		ClusterStatusUpdater: updater,
 		storage:              storage,
 
+		syncConfig:           syncConfig,
 		healthChecker:        healthChecker,
 		dynamicDiscovery:     dynamicDiscovery,
 		listerWatcherFactory: listWatchFactory,
@@ -115,8 +121,6 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
 		stopRunnerCh:   make(chan struct{}),
 
 		storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),
-
-		metricsStoreBuilder: metricsStoreBuilder,
 	}
 
 	var refresherOnce sync.Once
@@ -352,18 +356,20 @@ func (s *ClusterSynchro) refreshSyncResources() {
 			}
 
 			var metricsStore *kubestatemetrics.MetricsStore
-			if s.metricsStoreBuilder != nil {
-				metricsStore = s.metricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
+			if s.syncConfig.MetricsStoreBuilder != nil {
+				metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
 			}
-			synchro := newResourceSynchro(
-				s.name,
-				config.syncResource,
-				config.kind,
-				s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
-				rvs,
-				config.convertor,
-				resourceStorage,
-				metricsStore,
+			synchro := newResourceSynchro(s.name,
+				ResourceSynchroConfig{
+					GroupVersionResource: config.syncResource,
+					Kind:                 config.kind,
+					ListerWatcher:        s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
+					ObjectConvertor:      config.convertor,
+					ResourceStorage:      resourceStorage,
+					MetricsStore:         metricsStore,
+					ResourceVersions:     rvs,
+					PageSizeForInformer:  s.syncConfig.PageSizeForResourceSync,
+				},
 			)
 			s.waitGroup.StartWithChannel(s.closer, synchro.Run)
 			s.storageResourceSynchros.Store(storageGVR, synchro)
diff --git a/pkg/synchromanager/clustersynchro/informer/named_controller.go b/pkg/synchromanager/clustersynchro/informer/named_controller.go
index bf6407049..624c57e70 100644
--- a/pkg/synchromanager/clustersynchro/informer/named_controller.go
+++ b/pkg/synchromanager/clustersynchro/informer/named_controller.go
@@ -49,6 +49,9 @@ type Config struct {
 
 	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
 	WatchListPageSize int64
+
+	ForcePaginatedList           bool
+	StreamHandleForPaginatedList bool
 }
 
 type controller struct {
@@ -86,6 +89,8 @@ func (c *controller) Run(stopCh <-chan struct{}) {
 	}
 	r.ShouldResync = c.config.ShouldResync
 	r.WatchListPageSize = c.config.WatchListPageSize
+	r.ForcePaginatedList = c.config.ForcePaginatedList
+	r.StreamHandleForPaginatedList = c.config.StreamHandleForPaginatedList
 
 	c.reflectorMutex.Lock()
 	c.reflector = r
diff --git a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go
index ce5ade459..90a460279 100644
--- a/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go
+++ b/pkg/synchromanager/clustersynchro/informer/resourceversion_informer.go
@@ -19,7 +19,22 @@ type resourceVersionInformer struct {
 	listerWatcher cache.ListerWatcher
 }
 
-func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *ResourceVersionStorage, exampleObject runtime.Object, handler ResourceEventHandler, errorHandler WatchErrorHandler, extraStore ExtraStore) ResourceVersionInformer {
+type InformerConfig struct {
+	cache.ListerWatcher
+	Storage *ResourceVersionStorage
+
+	ExampleObject runtime.Object
+	Handler       ResourceEventHandler
+	ErrorHandler  WatchErrorHandler
+	ExtraStore    ExtraStore
+
+	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
+	WatchListPageSize            int64
+	ForcePaginatedList           bool
+	StreamHandleForPaginatedList bool
+}
+
+func NewResourceVersionInformer(name string, config InformerConfig) ResourceVersionInformer {
 	if name == "" {
 		panic("name is required")
 	}
@@ -27,9 +42,9 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re
 	// storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc),
 	informer := &resourceVersionInformer{
 		name:          name,
-		listerWatcher: lw,
-		storage:       storage,
-		handler:       handler,
+		listerWatcher: config.ListerWatcher,
+		storage:       config.Storage,
+		handler:       config.Handler,
 	}
 
 	var queue cache.Queue = cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
@@ -37,22 +52,26 @@ func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *Re
 		KnownObjects:          informer.storage,
 		EmitDeltaTypeReplaced: true,
 	})
-	if extraStore != nil {
-		queue = &queueWithExtraStore{Queue: queue, extra: extraStore}
+	if config.ExtraStore != nil {
+		queue = &queueWithExtraStore{Queue: queue, extra: config.ExtraStore}
 	}
 
-	config := &Config{
-		ListerWatcher: lw,
-		ObjectType:    exampleObject,
-		RetryOnError:  false,
-		Process: func(obj interface{}, isInInitialList bool) error {
-			deltas := obj.(cache.Deltas)
-			return informer.HandleDeltas(deltas, isInInitialList)
+	informer.controller = NewNamedController(informer.name,
+		&Config{
+			ListerWatcher: config.ListerWatcher,
+			ObjectType:    config.ExampleObject,
+			RetryOnError:  false,
+			Process: func(obj interface{}, isInInitialList bool) error {
+				deltas := obj.(cache.Deltas)
+				return informer.HandleDeltas(deltas, isInInitialList)
+			},
+			Queue:                        queue,
+			WatchErrorHandler:            config.ErrorHandler,
+			WatchListPageSize:            config.WatchListPageSize,
+			ForcePaginatedList:           config.ForcePaginatedList,
+			StreamHandleForPaginatedList: config.StreamHandleForPaginatedList,
 		},
-		Queue:             queue,
-		WatchErrorHandler: errorHandler,
-	}
-	informer.controller = NewNamedController(informer.name, config)
+	)
 	return informer
 }
 
diff --git a/pkg/synchromanager/clustersynchro/resource_synchro.go b/pkg/synchromanager/clustersynchro/resource_synchro.go
index 0e0ade89f..6f8e406b0 100644
--- a/pkg/synchromanager/clustersynchro/resource_synchro.go
+++ b/pkg/synchromanager/clustersynchro/resource_synchro.go
@@ -28,6 +28,24 @@ import (
 	clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
 )
 
+type ResourceSynchroConfig struct {
+	schema.GroupVersionResource
+	Kind string
+
+	cache.ListerWatcher
+	runtime.ObjectConvertor
+	storage.ResourceStorage
+
+	*kubestatemetrics.MetricsStore
+
+	ResourceVersions    map[string]interface{}
+	PageSizeForInformer int64
+}
+
+func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind {
+	return c.GroupVersionResource.GroupVersion().WithKind(c.Kind)
+}
+
 type ResourceSynchro struct {
 	cluster string
 
@@ -35,6 +53,7 @@ type ResourceSynchro struct {
 	syncResource    schema.GroupVersionResource
 	storageResource schema.GroupVersionResource
 
+	pageSize          int64
 	listerWatcher     cache.ListerWatcher
 	metricsExtraStore informer.ExtraStore
 	metricsWriter     *metricsstore.MetricsWriter
@@ -69,23 +88,22 @@ type ResourceSynchro struct {
 	runningStage string
 }
 
-func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource, kind string, lw cache.ListerWatcher, rvs map[string]interface{},
-	convertor runtime.ObjectConvertor, storage storage.ResourceStorage, metricsStore *kubestatemetrics.MetricsStore,
-) *ResourceSynchro {
-	storageConfig := storage.GetStorageConfig()
+func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro {
+	storageConfig := config.ResourceStorage.GetStorageConfig()
 	synchro := &ResourceSynchro{
 		cluster:         cluster,
-		syncResource:    syncResource,
+		syncResource:    config.GroupVersionResource,
 		storageResource: storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version),
 
-		listerWatcher: lw,
-		rvs:           rvs,
+		pageSize:      config.PageSizeForInformer,
+		listerWatcher: config.ListerWatcher,
+		rvs:           config.ResourceVersions,
 
 		// all resources saved to the queue are `runtime.Object`
 		queue: queue.NewPressureQueue(cache.MetaNamespaceKeyFunc),
 
-		storage:       storage,
-		convertor:     convertor,
+		storage:       config.ResourceStorage,
+		convertor:     config.ObjectConvertor,
 		memoryVersion: storageConfig.MemoryVersion,
 
 		stopped:              make(chan struct{}),
@@ -100,12 +118,12 @@ func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource
 	synchro.ctx, synchro.cancel = context.WithCancel(context.Background())
 
 	example := &unstructured.Unstructured{}
-	example.SetGroupVersionKind(syncResource.GroupVersion().WithKind(kind))
+	example.SetGroupVersionKind(config.GroupVersionKind())
 	synchro.example = example
 
-	if metricsStore != nil {
-		synchro.metricsExtraStore = metricsStore
-		synchro.metricsWriter = metricsstore.NewMetricsWriter(metricsStore.MetricsStore)
+	if config.MetricsStore != nil {
+		synchro.metricsExtraStore = config.MetricsStore
+		synchro.metricsWriter = metricsstore.NewMetricsWriter(config.MetricsStore.MetricsStore)
 	}
 
 	synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "")
@@ -244,10 +262,22 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {
 			synchro.rvsLock.Unlock()
 		}
 
-		informer.NewResourceVersionInformer(
-			synchro.cluster, synchro.listerWatcher, synchro.cache,
-			synchro.example, synchro, synchro.ErrorHandler, synchro.metricsExtraStore,
-		).Run(informerStopCh)
+		config := informer.InformerConfig{
+			ListerWatcher:     synchro.listerWatcher,
+			Storage:           synchro.cache,
+			ExampleObject:     synchro.example,
+			Handler:           synchro,
+			ErrorHandler:      synchro.ErrorHandler,
+			ExtraStore:        synchro.metricsExtraStore,
+			WatchListPageSize: synchro.pageSize,
+		}
+		if clusterpediafeature.FeatureGate.Enabled(features.StreamHandlePaginatedListForResourceSync) {
+			config.StreamHandleForPaginatedList = true
+		}
+		if clusterpediafeature.FeatureGate.Enabled(features.ForcePaginatedListForResourceSync) {
+			config.ForcePaginatedList = true
+		}
+		informer.NewResourceVersionInformer(synchro.cluster, config).Run(informerStopCh)
 
 		// TODO(Iceber): Optimize status updates in case of storage exceptions
 		if !synchro.isRunnableForStorage.Load() {
diff --git a/pkg/synchromanager/clustersynchro_manager.go b/pkg/synchromanager/clustersynchro_manager.go
index 00aef44fe..bffde8a8f 100644
--- a/pkg/synchromanager/clustersynchro_manager.go
+++ b/pkg/synchromanager/clustersynchro_manager.go
@@ -50,19 +50,19 @@ type Manager struct {
 
 	queue                      workqueue.RateLimitingInterface
 	storage                    storage.StorageFactory
-	metricsStoreBuilder        *kubestatemetrics.MetricsStoreBuilder
 	clusterlister              clusterlister.PediaClusterLister
 	clusterSyncResourcesLister clusterlister.ClusterSyncResourcesLister
 	clusterInformer            cache.SharedIndexInformer
 
-	synchrolock      sync.RWMutex
-	synchros         map[string]*clustersynchro.ClusterSynchro
-	synchroWaitGroup wait.Group
+	clusterSyncConfig clustersynchro.ClusterSyncConfig
+	synchrolock       sync.RWMutex
+	synchros          map[string]*clustersynchro.ClusterSynchro
+	synchroWaitGroup  wait.Group
 }
 
 var _ kubestatemetrics.ClusterMetricsWriterListGetter = &Manager{}
 
-func NewManager(client crdclientset.Interface, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder) *Manager {
+func NewManager(client crdclientset.Interface, storage storage.StorageFactory, syncConfig clustersynchro.ClusterSyncConfig) *Manager {
 	factory := externalversions.NewSharedInformerFactory(client, 0)
 	clusterinformer := factory.Cluster().V1alpha2().PediaClusters()
 	clusterSyncResourcesInformer := factory.Cluster().V1alpha2().ClusterSyncResources()
@@ -72,7 +72,6 @@ func NewManager(client crdclientset.Interface, storage storage.StorageFactory, m
 		clusterpediaclient: client,
 
 		storage:                    storage,
-		metricsStoreBuilder:        metricsStoreBuilder,
 		clusterlister:              clusterinformer.Lister(),
 		clusterInformer:            clusterinformer.Informer(),
 		clusterSyncResourcesLister: clusterSyncResourcesInformer.Lister(),
@@ -80,7 +79,8 @@ func NewManager(client crdclientset.Interface, storage storage.StorageFactory, m
 			NewItemExponentialFailureAndJitterSlowRateLimter(2*time.Second, 15*time.Second, 1*time.Minute, 1.0, defaultRetryNum),
 		),
 
-		synchros: make(map[string]*clustersynchro.ClusterSynchro),
+		clusterSyncConfig: syncConfig,
+		synchros:          make(map[string]*clustersynchro.ClusterSynchro),
 	}
 
 	if _, err := clusterinformer.Informer().AddEventHandler(
@@ -348,7 +348,7 @@ func (manager *Manager) reconcileCluster(cluster *clusterv1alpha2.PediaCluster)
 
 	// create resource synchro
 	if synchro == nil {
-		synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager.metricsStoreBuilder, manager)
+		synchro, err = clustersynchro.New(cluster.Name, config, manager.storage, manager, manager.clusterSyncConfig)
 		if err != nil {
 			_, forever := err.(clustersynchro.RetryableError)
 			klog.ErrorS(err, "Failed to create cluster synchro", "cluster", cluster.Name)
diff --git a/pkg/synchromanager/features/features.go b/pkg/synchromanager/features/features.go
index e46f85d01..5efce4459 100644
--- a/pkg/synchromanager/features/features.go
+++ b/pkg/synchromanager/features/features.go
@@ -45,6 +45,10 @@ const (
 	// owner: @iceber
 	// alpha: v0.6.0
 	HealthCheckerWithStandaloneTCP featuregate.Feature = "HealthCheckerWithStandaloneTCP"
+
+	ForcePaginatedListForResourceSync featuregate.Feature = "ForcePaginatedListForResourceSync"
+
+	StreamHandlePaginatedListForResourceSync featuregate.Feature = "StreamHandlePaginatedListForResourceSync"
 )
 
 func init() {
@@ -59,4 +63,7 @@ var defaultClusterSynchroManagerFeatureGates = map[featuregate.Feature]featurega
 	AllowSyncAllCustomResources:    {Default: false, PreRelease: featuregate.Alpha},
 	AllowSyncAllResources:          {Default: false, PreRelease: featuregate.Alpha},
 	HealthCheckerWithStandaloneTCP: {Default: false, PreRelease: featuregate.Alpha},
+
+	ForcePaginatedListForResourceSync:        {Default: false, PreRelease: featuregate.Alpha},
+	StreamHandlePaginatedListForResourceSync: {Default: false, PreRelease: featuregate.Alpha},
 }