Skip to content

Commit

Permalink
add FeatureGates and page-size flags about paginated list
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Nov 15, 2023
1 parent d0fa41a commit bd4eeff
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 61 deletions.
3 changes: 2 additions & 1 deletion cmd/clustersynchro-manager/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
metrics "github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

type Config struct {
Expand All @@ -19,8 +20,8 @@ type Config struct {
WorkerNumber int
MetricsServerConfig metrics.Config
KubeMetricsServerConfig *kubestatemetrics.ServerConfig
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
StorageFactory storage.StorageFactory
ClusterSyncConfig clustersynchro.ClusterSyncConfig

LeaderElection componentbaseconfig.LeaderElectionConfiguration
ClientConnection componentbaseconfig.ClientConnectionConfiguration
Expand Down
13 changes: 11 additions & 2 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

const (
Expand All @@ -44,7 +45,8 @@ type Options struct {
Metrics *metrics.Options
KubeStateMetrics *kubestatemetrics.Options

WorkerNumber int // WorkerNumber is the number of worker goroutines
WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -89,6 +91,9 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")

syncfs := fss.FlagSet("resource sync")
syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync")

options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)

fs := fss.FlagSet("misc")
Expand Down Expand Up @@ -165,7 +170,11 @@ func (o *Options) Config() (*config.Config, error) {

MetricsServerConfig: metricsConfig,
KubeMetricsServerConfig: kubeStateMetricsServerConfig,
MetricsStoreBuilder: metricsStoreBuilder,

ClusterSyncConfig: clustersynchro.ClusterSyncConfig{
MetricsStoreBuilder: metricsStoreBuilder,
PageSizeForResourceSync: o.PageSizeForResourceSync,
},

LeaderElection: o.LeaderElection,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewClusterSynchroManagerCommand(ctx context.Context) *cobra.Command {
}

func Run(ctx context.Context, c *config.Config) error {
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.MetricsStoreBuilder)
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.ClusterSyncConfig)

go func() {
metrics.RunServer(c.MetricsServerConfig)
Expand Down
36 changes: 21 additions & 15 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ import (
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ClusterSyncConfig struct {
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
PageSizeForResourceSync int64
}

type ClusterSynchro struct {
name string

RESTConfig *rest.Config
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory
Expand Down Expand Up @@ -69,7 +74,7 @@ type ClusterStatusUpdater interface {

type RetryableError error

func New(name string, config *rest.Config, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder, updater ClusterStatusUpdater) (*ClusterSynchro, error) {
func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater, syncConfig ClusterSyncConfig) (*ClusterSynchro, error) {
dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config)
if err != nil {
return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err))
Expand Down Expand Up @@ -103,6 +108,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
ClusterStatusUpdater: updater,
storage: storage,

syncConfig: syncConfig,
healthChecker: healthChecker,
dynamicDiscovery: dynamicDiscovery,
listerWatcherFactory: listWatchFactory,
Expand All @@ -115,8 +121,6 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
stopRunnerCh: make(chan struct{}),

storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),

metricsStoreBuilder: metricsStoreBuilder,
}

var refresherOnce sync.Once
Expand Down Expand Up @@ -352,18 +356,20 @@ func (s *ClusterSynchro) refreshSyncResources() {
}

var metricsStore *kubestatemetrics.MetricsStore
if s.metricsStoreBuilder != nil {
metricsStore = s.metricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
if s.syncConfig.MetricsStoreBuilder != nil {
metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
}
synchro := newResourceSynchro(
s.name,
config.syncResource,
config.kind,
s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
rvs,
config.convertor,
resourceStorage,
metricsStore,
synchro := newResourceSynchro(s.name,
ResourceSynchroConfig{
GroupVersionResource: config.syncResource,
Kind: config.kind,
ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
ObjectConvertor: config.convertor,
ResourceStorage: resourceStorage,
MetricsStore: metricsStore,
ResourceVersions: rvs,
PageSizeForInformer: s.syncConfig.PageSizeForResourceSync,
},
)
s.waitGroup.StartWithChannel(s.closer, synchro.Run)
s.storageResourceSynchros.Store(storageGVR, synchro)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type Config struct {

// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64

ForcePaginatedList bool
StreamHandleForPaginatedList bool
}

type controller struct {
Expand Down Expand Up @@ -86,6 +89,8 @@ func (c *controller) Run(stopCh <-chan struct{}) {
}
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.ForcePaginatedList = c.config.ForcePaginatedList
r.StreamHandleForPaginatedList = c.config.StreamHandleForPaginatedList

c.reflectorMutex.Lock()
c.reflector = r
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,40 +19,59 @@ type resourceVersionInformer struct {
listerWatcher cache.ListerWatcher
}

func NewResourceVersionInformer(name string, lw cache.ListerWatcher, storage *ResourceVersionStorage, exampleObject runtime.Object, handler ResourceEventHandler, errorHandler WatchErrorHandler, extraStore ExtraStore) ResourceVersionInformer {
type InformerConfig struct {
cache.ListerWatcher
Storage *ResourceVersionStorage

ExampleObject runtime.Object
Handler ResourceEventHandler
ErrorHandler WatchErrorHandler
ExtraStore ExtraStore

// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64
ForcePaginatedList bool
StreamHandleForPaginatedList bool
}

func NewResourceVersionInformer(name string, config InformerConfig) ResourceVersionInformer {
if name == "" {
panic("name is required")
}

// storage: NewResourceVersionStorage(cache.DeletionHandlingMetaNamespaceKeyFunc),
informer := &resourceVersionInformer{
name: name,
listerWatcher: lw,
storage: storage,
handler: handler,
listerWatcher: config.ListerWatcher,
storage: config.Storage,
handler: config.Handler,
}

var queue cache.Queue = cache.NewDeltaFIFOWithOptions(cache.DeltaFIFOOptions{
KeyFunction: cache.DeletionHandlingMetaNamespaceKeyFunc,
KnownObjects: informer.storage,
EmitDeltaTypeReplaced: true,
})
if extraStore != nil {
queue = &queueWithExtraStore{Queue: queue, extra: extraStore}
if config.ExtraStore != nil {
queue = &queueWithExtraStore{Queue: queue, extra: config.ExtraStore}
}

config := &Config{
ListerWatcher: lw,
ObjectType: exampleObject,
RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error {
deltas := obj.(cache.Deltas)
return informer.HandleDeltas(deltas, isInInitialList)
informer.controller = NewNamedController(informer.name,
&Config{
ListerWatcher: config.ListerWatcher,
ObjectType: config.ExampleObject,
RetryOnError: false,
Process: func(obj interface{}, isInInitialList bool) error {
deltas := obj.(cache.Deltas)
return informer.HandleDeltas(deltas, isInInitialList)
},
Queue: queue,
WatchErrorHandler: config.ErrorHandler,
WatchListPageSize: config.WatchListPageSize,
ForcePaginatedList: config.ForcePaginatedList,
StreamHandleForPaginatedList: config.StreamHandleForPaginatedList,
},
Queue: queue,
WatchErrorHandler: errorHandler,
}
informer.controller = NewNamedController(informer.name, config)
)
return informer
}

Expand Down
64 changes: 47 additions & 17 deletions pkg/synchromanager/clustersynchro/resource_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,32 @@ import (
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ResourceSynchroConfig struct {
schema.GroupVersionResource
Kind string

cache.ListerWatcher
runtime.ObjectConvertor
storage.ResourceStorage

*kubestatemetrics.MetricsStore

ResourceVersions map[string]interface{}
PageSizeForInformer int64
}

func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind {
return c.GroupVersionResource.GroupVersion().WithKind(c.Kind)
}

type ResourceSynchro struct {
cluster string

example runtime.Object
syncResource schema.GroupVersionResource
storageResource schema.GroupVersionResource

pageSize int64
listerWatcher cache.ListerWatcher
metricsExtraStore informer.ExtraStore
metricsWriter *metricsstore.MetricsWriter
Expand Down Expand Up @@ -69,23 +88,22 @@ type ResourceSynchro struct {
runningStage string
}

func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource, kind string, lw cache.ListerWatcher, rvs map[string]interface{},
convertor runtime.ObjectConvertor, storage storage.ResourceStorage, metricsStore *kubestatemetrics.MetricsStore,
) *ResourceSynchro {
storageConfig := storage.GetStorageConfig()
func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro {
storageConfig := config.ResourceStorage.GetStorageConfig()
synchro := &ResourceSynchro{
cluster: cluster,
syncResource: syncResource,
syncResource: config.GroupVersionResource,
storageResource: storageConfig.StorageGroupResource.WithVersion(storageConfig.StorageVersion.Version),

listerWatcher: lw,
rvs: rvs,
pageSize: config.PageSizeForInformer,
listerWatcher: config.ListerWatcher,
rvs: config.ResourceVersions,

// all resources saved to the queue are `runtime.Object`
queue: queue.NewPressureQueue(cache.MetaNamespaceKeyFunc),

storage: storage,
convertor: convertor,
storage: config.ResourceStorage,
convertor: config.ObjectConvertor,
memoryVersion: storageConfig.MemoryVersion,

stopped: make(chan struct{}),
Expand All @@ -100,12 +118,12 @@ func newResourceSynchro(cluster string, syncResource schema.GroupVersionResource
synchro.ctx, synchro.cancel = context.WithCancel(context.Background())

example := &unstructured.Unstructured{}
example.SetGroupVersionKind(syncResource.GroupVersion().WithKind(kind))
example.SetGroupVersionKind(config.GroupVersionKind())
synchro.example = example

if metricsStore != nil {
synchro.metricsExtraStore = metricsStore
synchro.metricsWriter = metricsstore.NewMetricsWriter(metricsStore.MetricsStore)
if config.MetricsStore != nil {
synchro.metricsExtraStore = config.MetricsStore
synchro.metricsWriter = metricsstore.NewMetricsWriter(config.MetricsStore.MetricsStore)
}

synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "")
Expand Down Expand Up @@ -244,10 +262,22 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {
synchro.rvsLock.Unlock()
}

informer.NewResourceVersionInformer(
synchro.cluster, synchro.listerWatcher, synchro.cache,
synchro.example, synchro, synchro.ErrorHandler, synchro.metricsExtraStore,
).Run(informerStopCh)
config := informer.InformerConfig{
ListerWatcher: synchro.listerWatcher,
Storage: synchro.cache,
ExampleObject: synchro.example,
Handler: synchro,
ErrorHandler: synchro.ErrorHandler,
ExtraStore: synchro.metricsExtraStore,
WatchListPageSize: synchro.pageSize,
}
if clusterpediafeature.FeatureGate.Enabled(features.StreamHandlePaginatedListForResourceSync) {
config.StreamHandleForPaginatedList = true
}
if clusterpediafeature.FeatureGate.Enabled(features.ForcePaginatedListForResourceSync) {
config.ForcePaginatedList = true
}
informer.NewResourceVersionInformer(synchro.cluster, config).Run(informerStopCh)

// TODO(Iceber): Optimize status updates in case of storage exceptions
if !synchro.isRunnableForStorage.Load() {
Expand Down
Loading

0 comments on commit bd4eeff

Please sign in to comment.