Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support for storage to implement custom resource synchro #677

Merged
merged 1 commit into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 34 additions & 19 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
resourceconfigfactory "github.com/clusterpedia-io/clusterpedia/pkg/runtime/resourceconfig/factory"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

Expand All @@ -36,11 +37,12 @@ type ClusterSynchro struct {
RESTConfig *rest.Config
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory
storage storage.StorageFactory
resourceSynchroFactory resourcesynchro.SynchroFactory
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory

closeOnce sync.Once
closer chan struct{}
Expand Down Expand Up @@ -123,6 +125,12 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat
storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),
}

if factory, ok := storage.(resourcesynchro.SynchroFactory); ok {
synchro.resourceSynchroFactory = factory
} else {
synchro.resourceSynchroFactory = DefaultResourceSynchroFactory{}
}

var refresherOnce sync.Once
synchro.dynamicDiscovery.Prepare(discovery.PrepareConfig{
ResourceMutationHandler: synchro.resetSyncResources,
Expand Down Expand Up @@ -167,8 +175,9 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat

func (s *ClusterSynchro) GetMetricsWriterList() (writers metricsstore.MetricsWriterList) {
s.storageResourceSynchros.Range(func(_, value interface{}) bool {
if synchro := value.(*ResourceSynchro); synchro.metricsWriter != nil {
writers = append(writers, synchro.metricsWriter)
synchro := value.(resourcesynchro.Synchro)
if writer := synchro.GetMetricsWriter(); writer != nil {
writers = append(writers, writer)
}
return true
})
Expand Down Expand Up @@ -237,15 +246,15 @@ func (s *ClusterSynchro) Shutdown(updateStatus bool) {
shutdownCount := 0
statuses := make(map[string][]string)
s.storageResourceSynchros.Range(func(key, value interface{}) bool {
synchro := value.(*ResourceSynchro)
synchro := value.(resourcesynchro.Synchro)
status := synchro.Status()
if status.Status == clusterv1alpha2.ResourceSyncStatusStop && status.Reason == "" {
shutdownCount++
return true
}

gvr := key.(schema.GroupVersionResource)
sr := fmt.Sprintf("%s,%s,%s", status.Status, status.Reason, synchro.runningStage)
sr := fmt.Sprintf("%s,%s,%s", status.Status, status.Reason, synchro.Stage())
msg := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource)
statuses[sr] = append(statuses[sr], msg)
return true
Expand Down Expand Up @@ -359,18 +368,23 @@ func (s *ClusterSynchro) refreshSyncResources() {
if s.syncConfig.MetricsStoreBuilder != nil {
metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
}
synchro := newResourceSynchro(s.name,
ResourceSynchroConfig{
synchro, err := s.resourceSynchroFactory.NewResourceSynchro(s.name,
resourcesynchro.Config{
GroupVersionResource: config.syncResource,
Kind: config.kind,
ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
ObjectConvertor: config.convertor,
ResourceStorage: resourceStorage,
MetricsStore: metricsStore,
ResourceVersions: rvs,
PageSizeForInformer: s.syncConfig.PageSizeForResourceSync,
ResourceStorage: resourceStorage,
},
)
if err != nil {
klog.ErrorS(err, "Failed to create resource synchro", "cluster", s.name, "storage resource", storageGVR)
updateSyncConditions(storageGVR, clusterv1alpha2.ResourceSyncStatusPending, "SynchroCreateFailed", fmt.Sprintf("new resource synchro failed: %s", err))
continue
}
s.waitGroup.StartWithChannel(s.closer, synchro.Run)
s.storageResourceSynchros.Store(storageGVR, synchro)

Expand Down Expand Up @@ -400,7 +414,7 @@ func (s *ClusterSynchro) refreshSyncResources() {
for storageGVR := range removedStorageGVRs {
if synchro, ok := s.storageResourceSynchros.Load(storageGVR); ok {
select {
case <-synchro.(*ResourceSynchro).Close():
case <-synchro.(resourcesynchro.Synchro).Close():
case <-s.closer:
return
}
Expand Down Expand Up @@ -475,7 +489,7 @@ func (s *ClusterSynchro) runner() {
go s.dynamicDiscovery.Start(s.handlerStopCh)

s.storageResourceSynchros.Range(func(_, value interface{}) bool {
go value.(*ResourceSynchro).Start(s.handlerStopCh)
go value.(resourcesynchro.Synchro).Start(s.handlerStopCh)
return true
})
}()
Expand Down Expand Up @@ -542,12 +556,13 @@ func (s *ClusterSynchro) genClusterStatus() *clusterv1alpha2.ClusterStatus {
gr := schema.GroupResource{Group: status.Group, Resource: resource.Name}
storageGVR := cond.StorageGVR(gr)
if value, ok := s.storageResourceSynchros.Load(storageGVR); ok {
synchro := value.(*ResourceSynchro)
if gr != synchro.syncResource.GroupResource() {
cond.SyncResource = synchro.syncResource.GroupResource().String()
synchro := value.(resourcesynchro.Synchro)
syncedGVR := synchro.GroupVersionResource()
if gr != syncedGVR.GroupResource() {
cond.SyncResource = syncedGVR.GroupResource().String()
}
if cond.Version != synchro.syncResource.Version {
cond.SyncVersion = synchro.syncResource.Version
if cond.Version != syncedGVR.Version {
cond.SyncVersion = syncedGVR.Version
}

status := synchro.Status()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,16 @@ import (
metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"

clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/runtime/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/queue"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/resourcesynchro/queue"
"github.com/clusterpedia-io/clusterpedia/pkg/utils"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ResourceSynchroConfig struct {
schema.GroupVersionResource
Kind string

cache.ListerWatcher
runtime.ObjectConvertor
storage.ResourceStorage

*kubestatemetrics.MetricsStore

ResourceVersions map[string]interface{}
PageSizeForInformer int64
}

func (c ResourceSynchroConfig) GroupVersionKind() schema.GroupVersionKind {
return c.GroupVersionResource.GroupVersion().WithKind(c.Kind)
}

type ResourceSynchro struct {
type resourceSynchro struct {
cluster string

example runtime.Object
Expand Down Expand Up @@ -88,9 +70,13 @@ type ResourceSynchro struct {
runningStage string
}

func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceSynchro {
type DefaultResourceSynchroFactory struct{}

var _ resourcesynchro.SynchroFactory = DefaultResourceSynchroFactory{}

func (factory DefaultResourceSynchroFactory) NewResourceSynchro(cluster string, config resourcesynchro.Config) (resourcesynchro.Synchro, error) {
storageConfig := config.ResourceStorage.GetStorageConfig()
synchro := &ResourceSynchro{
synchro := &resourceSynchro{
cluster: cluster,
syncResource: config.GroupVersionResource,
storageResource: storageConfig.StorageResource,
Expand Down Expand Up @@ -127,10 +113,26 @@ func newResourceSynchro(cluster string, config ResourceSynchroConfig) *ResourceS
}

synchro.setStatus(clusterv1alpha2.ResourceSyncStatusPending, "", "")
return synchro
return synchro, nil
}

func (synchro *resourceSynchro) Stage() string {
return synchro.runningStage
}

func (synchro *resourceSynchro) GroupVersionResource() schema.GroupVersionResource {
return synchro.syncResource
}

func (synchro *resourceSynchro) StoragedGroupVersionResource() schema.GroupVersionResource {
return synchro.storageResource
}

func (synchro *resourceSynchro) GetMetricsWriter() *metricsstore.MetricsWriter {
return synchro.metricsWriter
}

func (synchro *ResourceSynchro) Run(shutdown <-chan struct{}) {
func (synchro *resourceSynchro) Run(shutdown <-chan struct{}) {
defer close(synchro.closed)
go func() {
select {
Expand Down Expand Up @@ -158,7 +160,7 @@ func (synchro *ResourceSynchro) Run(shutdown <-chan struct{}) {
synchro.runningStage = "shutdown"
}

func (synchro *ResourceSynchro) Close() <-chan struct{} {
func (synchro *resourceSynchro) Close() <-chan struct{} {
synchro.closeOnce.Do(func() {
close(synchro.closer)
synchro.queue.Close()
Expand All @@ -167,7 +169,7 @@ func (synchro *ResourceSynchro) Close() <-chan struct{} {
return synchro.closed
}

func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {
func (synchro *resourceSynchro) Start(stopCh <-chan struct{}) {
synchro.startlock.Lock()
stopped := synchro.stopped // avoid race
synchro.startlock.Unlock()
Expand Down Expand Up @@ -288,7 +290,7 @@ func (synchro *ResourceSynchro) Start(stopCh <-chan struct{}) {

const LastAppliedConfigurationAnnotation = "kubectl.kubernetes.io/last-applied-configuration"

func (synchro *ResourceSynchro) pruneObject(obj *unstructured.Unstructured) {
func (synchro *resourceSynchro) pruneObject(obj *unstructured.Unstructured) {
if clusterpediafeature.FeatureGate.Enabled(features.PruneManagedFields) {
obj.SetManagedFields(nil)
}
Expand All @@ -305,7 +307,7 @@ func (synchro *ResourceSynchro) pruneObject(obj *unstructured.Unstructured) {
}
}

func (synchro *ResourceSynchro) OnAdd(obj interface{}, isInInitialList bool) {
func (synchro *resourceSynchro) OnAdd(obj interface{}, isInInitialList bool) {
if !synchro.isRunnableForStorage.Load() {
return
}
Expand All @@ -327,7 +329,7 @@ func (synchro *ResourceSynchro) OnAdd(obj interface{}, isInInitialList bool) {
_ = synchro.queue.Add(obj)
}

func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) {
func (synchro *resourceSynchro) OnUpdate(_, obj interface{}) {
if !synchro.isRunnableForStorage.Load() {
return
}
Expand All @@ -342,7 +344,7 @@ func (synchro *ResourceSynchro) OnUpdate(_, obj interface{}) {
_ = synchro.queue.Update(obj)
}

func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
func (synchro *resourceSynchro) OnDelete(obj interface{}) {
if !synchro.isRunnableForStorage.Load() {
return
}
Expand All @@ -357,9 +359,9 @@ func (synchro *ResourceSynchro) OnDelete(obj interface{}) {
_ = synchro.queue.Delete(obj)
}

func (synchro *ResourceSynchro) OnSync(obj interface{}) {}
func (synchro *resourceSynchro) OnSync(obj interface{}) {}

func (synchro *ResourceSynchro) processResources() {
func (synchro *resourceSynchro) processResources() {
for {
select {
case <-synchro.closer:
Expand All @@ -381,7 +383,7 @@ func (synchro *ResourceSynchro) processResources() {
}
}

func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
func (synchro *resourceSynchro) handleResourceEvent(event *queue.Event) {
defer func() { _ = synchro.queue.Done(event) }()

obj, ok := event.Object.(runtime.Object)
Expand Down Expand Up @@ -476,7 +478,7 @@ func (synchro *ResourceSynchro) handleResourceEvent(event *queue.Event) {
}
}

func (synchro *ResourceSynchro) setRunnableForStorage() {
func (synchro *resourceSynchro) setRunnableForStorage() {
synchro.isRunnableForStorage.Store(true)

synchro.forStorageLock.Lock()
Expand All @@ -494,7 +496,7 @@ func (synchro *ResourceSynchro) setRunnableForStorage() {
}
}

func (synchro *ResourceSynchro) setStopForStorage() {
func (synchro *resourceSynchro) setStopForStorage() {
synchro.isRunnableForStorage.Store(false)

synchro.forStorageLock.Lock()
Expand All @@ -512,7 +514,7 @@ func (synchro *ResourceSynchro) setStopForStorage() {
}
}

func (synchro *ResourceSynchro) convertToStorageVersion(obj runtime.Object) (runtime.Object, error) {
func (synchro *resourceSynchro) convertToStorageVersion(obj runtime.Object) (runtime.Object, error) {
if synchro.syncResource == synchro.storageResource || synchro.convertor == nil {
return obj, nil
}
Expand All @@ -535,27 +537,27 @@ func (synchro *ResourceSynchro) convertToStorageVersion(obj runtime.Object) (run
return obj, nil
}

func (synchro *ResourceSynchro) createOrUpdateResource(ctx context.Context, obj runtime.Object) error {
func (synchro *resourceSynchro) createOrUpdateResource(ctx context.Context, obj runtime.Object) error {
err := synchro.storage.Create(ctx, synchro.cluster, obj)
if genericstorage.IsExist(err) {
return synchro.storage.Update(ctx, synchro.cluster, obj)
}
return err
}

func (synchro *ResourceSynchro) updateOrCreateResource(ctx context.Context, obj runtime.Object) error {
func (synchro *resourceSynchro) updateOrCreateResource(ctx context.Context, obj runtime.Object) error {
err := synchro.storage.Update(ctx, synchro.cluster, obj)
if genericstorage.IsNotFound(err) {
return synchro.storage.Create(ctx, synchro.cluster, obj)
}
return err
}

func (synchro *ResourceSynchro) deleteResource(ctx context.Context, obj runtime.Object) error {
func (synchro *resourceSynchro) deleteResource(ctx context.Context, obj runtime.Object) error {
return synchro.storage.Delete(ctx, synchro.cluster, obj)
}

func (synchro *ResourceSynchro) setStatus(status string, reason, message string) {
func (synchro *resourceSynchro) setStatus(status string, reason, message string) {
synchro.status.Store(clusterv1alpha2.ClusterResourceSyncCondition{
Status: status,
Reason: reason,
Expand All @@ -564,11 +566,11 @@ func (synchro *ResourceSynchro) setStatus(status string, reason, message string)
})
}

func (synchro *ResourceSynchro) Status() clusterv1alpha2.ClusterResourceSyncCondition {
func (synchro *resourceSynchro) Status() clusterv1alpha2.ClusterResourceSyncCondition {
return synchro.status.Load().(clusterv1alpha2.ClusterResourceSyncCondition)
}

func (synchro *ResourceSynchro) ErrorHandler(r *informer.Reflector, err error) {
func (synchro *resourceSynchro) ErrorHandler(r *informer.Reflector, err error) {
if err != nil {
// TODO(iceber): Use `k8s.io/apimachinery/pkg/api/errors` to resolve the error type and update it to `status.Reason`
synchro.setStatus(clusterv1alpha2.ResourceSyncStatusError, "ResourceWatchFailed", err.Error())
Expand Down
Loading
Loading