diff --git a/pkg/storage/memorystorage/v2/README.md b/pkg/storage/memorystorage/v2/README.md new file mode 100644 index 000000000..f372a73e5 --- /dev/null +++ b/pkg/storage/memorystorage/v2/README.md @@ -0,0 +1,28 @@ +# Memory Storage Layer v2 (Alpha) +Due to significant updates and modifications required for the memory storage layer, version 2 (v2) has been introduced. Unless otherwise necessary, memory v1 will only be supported in Clusterpedia 0.x. + +⚠️ The current memory v2 is in the alpha stage, has not undergone rigorous testing, and the foundational storage layer functionalities will be gradually improved and implemented. + +This version draws inspiration from the apiserver/storage/cache library but does not necessarily follow all its design principles. + +### Major Changes Compared to v1 +#### ResourceVersion Format Changes +In v1, the resource version used a base64 encoded JSON format to merge the resource versions of each cluster into the resource's resource version field. + +In v2, the resource version format is `..`: +* An incrementing integer is used to represent the sequential order of resources, for version operations during List and Watch. +* The original resource version is retained. +* The prefix is used to identify the validity of the incrementing integer when requests switch between instances. + +For Watch requests, a JSON formatted `ListOptions.ResourceVersion = {"": ""}` is supported to maintain continuity of Watch requests when switching instances. +> The clusterpedia version of Informer is required to replace the k8s.io/client-go Informer. + +#### Using a Dedicated Resource Synchro +Due to the unique nature of the memory storage layer, it is unnecessary to use the default resource synchronizer of ClusterSynchro to maintain the informer store. + +Memory v2 will directly use the native k8s informer as the resource synchronizer. Memory v2 will act as the Store for the k8s Informer, saving data directly into storage, avoiding intermediate operations and memory usage. + +#### Supporting Dual Data Source Updates +In addition to the resource synchronizer saving resources in memory v2, external active additions, deletions, and modifications of memory v2 resources are also supported. + +This ensures consistency of requests when supporting write operations at the apiserver layer through dual-write operations. \ No newline at end of file diff --git a/pkg/storage/memorystorage/v2/cache_watcher.go b/pkg/storage/memorystorage/v2/cache_watcher.go new file mode 100644 index 000000000..f8ed328db --- /dev/null +++ b/pkg/storage/memorystorage/v2/cache_watcher.go @@ -0,0 +1,178 @@ +package memorystorage + +import ( + "context" + "time" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// cacheWatcher implements watch.Interface +type cacheWatcher struct { + input chan *watchCacheEvent + result chan watch.Event + + filter filterWithAttrsFunc + + stopped bool + done chan struct{} + forget func() +} + +type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool + +func newCacheWatcher(chanSize int, filter filterWithAttrsFunc) *cacheWatcher { + return &cacheWatcher{ + input: make(chan *watchCacheEvent, chanSize), + result: make(chan watch.Event, chanSize), + done: make(chan struct{}), + filter: filter, + forget: func() {}, + stopped: false, + } +} + +// ResultChan implements watch.Interface. +func (c *cacheWatcher) ResultChan() <-chan watch.Event { + return c.result +} + +// Stop implements watch.Interface. +func (c *cacheWatcher) Stop() { + c.forget() +} + +func (c *cacheWatcher) stopLocked() { + if !c.stopped { + c.stopped = true + close(c.done) + close(c.input) + } +} + +func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { + select { + case c.input <- event: + return true + default: + return false + } +} + +// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) +func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { + // Try to send the event immediately, without blocking. + if c.nonblockingAdd(event) { + return true + } + + closeFunc := func() { + c.forget() + } + + if timer == nil { + closeFunc() + return false + } + + select { + case c.input <- event: + return true + case <-timer.C: + closeFunc() + return false + } +} + +func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { + curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) + var oldObjPasses bool + if event.PrevObject != nil { + oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) + } + if !curObjPasses && !oldObjPasses { + return nil + } + + switch { + case curObjPasses && !oldObjPasses: + return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()} + case curObjPasses && oldObjPasses: + return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} + + case !curObjPasses && oldObjPasses: + oldObj := event.PrevObject.DeepCopyObject() + return &watch.Event{Type: watch.Deleted, Object: oldObj} + } + return nil +} + +func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { + watchEvent := c.convertToWatchEvent(event) + if watchEvent == nil { + // Watcher is not interested in that object. + return + } + + // We need to ensure that if we put event X to the c.result, all + // previous events were already put into it before, no matter whether + // c.done is close or not. + // Thus we cannot simply select from c.done and c.result and this + // would give us non-determinism. + // At the same time, we don't want to block infinitely on putting + // to c.result, when c.done is already closed. + + // This ensures that with c.done already close, we at most once go + // into the next select after this. With that, no matter which + // statement we choose there, we will deliver only consecutive + // events. + select { + case <-c.done: + return + default: + } + + select { + case c.result <- *watchEvent: + case <-c.done: + } +} + +func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, indexRV uint64) { + defer utilruntime.HandleCrash() + + defer close(c.result) + defer c.Stop() + + for { + event, err := cacheInterval.Next() + if err != nil { + return + } + if event == nil { + break + } + c.sendWatchCacheEvent(event) + + if event.Index > indexRV { + indexRV = event.Index + } + } + + for { + select { + case event, ok := <-c.input: + if !ok { + return + } + if event.Index > indexRV { + c.sendWatchCacheEvent(event) + } + case <-ctx.Done(): + return + } + } +} diff --git a/pkg/storage/memorystorage/v2/register.go b/pkg/storage/memorystorage/v2/register.go new file mode 100644 index 000000000..216b98bfb --- /dev/null +++ b/pkg/storage/memorystorage/v2/register.go @@ -0,0 +1,18 @@ +package memorystorage + +import ( + "github.com/clusterpedia-io/clusterpedia/pkg/storage" +) + +const ( + StorageName = "memory.v2/alpha" +) + +func init() { + storage.RegisterStorageFactoryFunc(StorageName, NewStorageFactory) +} + +func NewStorageFactory(_ string) (storage.StorageFactory, error) { + storageFactory := &StorageFactory{} + return storageFactory, nil +} diff --git a/pkg/storage/memorystorage/v2/resource_storage.go b/pkg/storage/memorystorage/v2/resource_storage.go new file mode 100644 index 000000000..3dcb9986a --- /dev/null +++ b/pkg/storage/memorystorage/v2/resource_storage.go @@ -0,0 +1,584 @@ +package memorystorage + +import ( + "context" + "errors" + "fmt" + "reflect" + "sort" + "sync" + "time" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/utils/clock" + + internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" +) + +type watchersMap map[int]*cacheWatcher + +func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { + wm[number] = w +} + +func (wm watchersMap) deleteWatcher(number int) { + delete(wm, number) +} + +func (wm watchersMap) terminateAll(done func(*cacheWatcher)) { + for key, watcher := range wm { + delete(wm, key) + done(watcher) + } +} + +type namespacedName struct { + namespace string + name string +} + +type indexedWatchers struct { + allWatchers map[namespacedName]watchersMap +} + +func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespacedName) { + scopedWatchers, ok := i.allWatchers[scope] + if !ok { + scopedWatchers = watchersMap{} + i.allWatchers[scope] = scopedWatchers + } + scopedWatchers.addWatcher(w, number) +} + +func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName) { + i.allWatchers[scope].deleteWatcher(number) + if len(i.allWatchers[scope]) == 0 { + delete(i.allWatchers, scope) + } +} + +type ResourceStorage struct { + storageConfig *storage.ResourceStorageConfig + + keyFunc func(runtime.Object) (string, error) + storeLock sync.RWMutex + stores map[string]cache.Indexer + + cacheLock sync.RWMutex + capacity int + cache []*watchCacheEvent + startIndex int + endIndex int + + timer *time.Timer + + cond *sync.Cond + clock clock.Clock + + incoming chan watchCacheEvent + + watcherIdx int + watcherLock sync.RWMutex + watchers indexedWatchers + + dispatching bool + watchersBuffer []*cacheWatcher + blockedWatchers []*cacheWatcher + watchersToStop []*cacheWatcher + + stopCh chan struct{} +} + +func (s *ResourceStorage) cleanCluster(cluster string) { + // When cleaning up a cluster, support is needed to configure the time to send + // the removal of all resources in the cluster or a cluster removal event. + // If a cluster removal event is sent, then the client informer needs to be adapted. +} + +func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig { + return s.storageConfig +} + +var accessor = meta.NewAccessor() + +func (s *ResourceStorage) getOrCreateClusterStore(cluster string) cache.Indexer { + s.storeLock.RLock() + store := s.stores[cluster] + s.storeLock.RUnlock() + if store != nil { + return store + } + + s.storeLock.Lock() + defer s.storeLock.Unlock() + store = cache.NewIndexer(storeElementKey, nil) + s.stores[cluster] = store + return store +} + +func (s *ResourceStorage) Create(ctx context.Context, cluster string, obj runtime.Object) error { + store := s.getOrCreateClusterStore(cluster) + return s.processObject(store, obj, watch.Added, + func(o *storeElement) error { return store.Add(store) }, + ) +} + +func (s *ResourceStorage) Update(ctx context.Context, cluster string, obj runtime.Object) error { + store := s.getOrCreateClusterStore(cluster) + return s.processObject(store, obj, watch.Modified, + func(o *storeElement) error { return store.Update(store) }, + ) +} + +func (s *ResourceStorage) ConvertDeletedObject(obj interface{}) (runobj runtime.Object, _ error) { + return nil, nil +} + +func (s *ResourceStorage) Delete(ctx context.Context, cluster string, obj runtime.Object) error { + store := s.getOrCreateClusterStore(cluster) + return s.processObject(store, obj, watch.Added, + func(o *storeElement) error { return store.Delete(store) }, + ) +} + +func (s *ResourceStorage) processObject(store cache.Store, obj runtime.Object, action watch.EventType, updateFunc func(*storeElement) error) error { + rv, _ := accessor.ResourceVersion(obj) + indexRV, memoryRV := ConvertResourceVersionToMemoryResourceVersion(rv) + if err := accessor.SetResourceVersion(obj, memoryRV); err != nil { + return err + } + + key, err := s.keyFunc(obj) + if err != nil { + return err + } + elem := &storeElement{Key: key, Object: obj} + elem.Labels, elem.Annotations, _, err = getObjectAttrs(obj) + if err != nil { + return err + } + + wcEvent := &watchCacheEvent{ + Type: action, + Key: key, + Object: elem.Object, + ObjLabels: elem.Labels, + IndexRV: indexRV, + ResourceVersion: memoryRV, + RecordTime: s.clock.Now(), + } + + previous, exists, err := store.GetByKey(key) + if err != nil { + return err + } + if exists { + previousElem := previous.(*storeElement) + wcEvent.PrevObject = previousElem.Object + wcEvent.PrevObjLabels = previousElem.Labels + } + + if err := func() error { + s.cacheLock.Lock() + defer s.cacheLock.Unlock() + s.updateCache(wcEvent) + return updateFunc(elem) + }(); err != nil { + return err + } + + s.eventHandler(wcEvent) + return nil +} + +func (s *ResourceStorage) updateCache(event *watchCacheEvent) { + s.resizeCacheLocked(event.RecordTime) + if s.isCacheFullLocked() { + // Cache is full - remove the oldest element. + s.startIndex++ + } + s.cache[s.endIndex%s.capacity] = event + s.endIndex++ +} + +const blockTimeout = 3 * time.Second + +func (s *ResourceStorage) waitUntilFreshAndBlock(ctx context.Context, rvIndex uint64, rv uint64) error { + startTime := s.clock.Now() + + if rvIndex > 0 { + go func() { + <-s.clock.After(blockTimeout) + s.cond.Broadcast() + }() + } + + for versioner.Load() < rvIndex { + if s.clock.Since(startTime) >= blockTimeout { + return nil + // return storage.NewTooLargeResourceVersionError(resourceVersion, w.resourceVersion, resourceVersionTooHighRetrySeconds) + } + s.cond.Wait() + } + return nil +} + +func (s *ResourceStorage) Get(ctx context.Context, cluster, namespace, name string, into runtime.Object) error { + s.storeLock.RLock() + store := s.stores[cluster] + if store == nil { + s.storeLock.RUnlock() + return errors.New("not found") + } + s.storeLock.RUnlock() + + obj, exists, err := store.GetByKey(fmt.Sprintf("%s/%s", namespace, name)) + if err != nil { + return err + } + objVal, err := conversion.EnforcePtr(into) + if !exists { + objVal.Set(reflect.Zero(objVal.Type())) + return errors.New("not found") + } + + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + objVal.Set(reflect.ValueOf(elem.Object).Elem()) + return nil +} + +func (s *ResourceStorage) List(ctx context.Context, listObject runtime.Object, opts *internal.ListOptions) error { + var indexRV uint64 + var objs []runtime.Object + for _, store := range s.stores { + for _, obj := range store.List() { + elem, ok := obj.(*storeElement) + if !ok { + return fmt.Errorf("non *storeElement returned from storage: %v", obj) + } + if elem.IndexRV > indexRV { + indexRV = elem.IndexRV + } + // filter elem + objs = append(objs, elem.Object.DeepCopyObject()) + } + } + + listPtr, err := meta.GetItemsPtr(listObject) + if err != nil { + return err + } + listVal, err := conversion.EnforcePtr(listPtr) + if err != nil { + return err + } + if listVal.Kind() != reflect.Slice { + return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) + } + listVal.Set(reflect.MakeSlice(listVal.Type(), len(objs), len(objs))) + for i, o := range objs { + listVal.Index(i).Set(reflect.ValueOf(o).Elem()) + } + if err := accessor.SetResourceVersion(listObject, ConvertToMemoryResourceVersionForList(indexRV)); err != nil { + return err + } + return nil +} + +func (s *ResourceStorage) Watch(ctx context.Context, options *internal.ListOptions) (watch.Interface, error) { + indexRV, err := ConvertMemoryResourceVersionToResourceVersionForList(options.ResourceVersion) + + scope := namespacedName{} + watcher := newCacheWatcher(100, func(key string, _ labels.Set, f fields.Set) bool { return true }) + cacheInterval, err := s.getAllEventsSinceLocked(indexRV) + if err != nil { + return nil, errors.New("") + } + + func() { + s.watcherLock.Lock() + defer s.watcherLock.Unlock() + + watcher.forget = forgetWatcher(s, watcher, s.watcherIdx, scope) + s.watchers.addWatcher(watcher, s.watcherIdx, scope) + s.watcherIdx++ + }() + + go watcher.processInterval(ctx, cacheInterval, indexRV) + return watcher, nil +} + +func (s *ResourceStorage) getAllEventsSinceLocked(indexRV uint64) (*watchCacheInterval, error) { + size := s.endIndex - s.startIndex + var oldest uint64 + switch { + case size > 0: + oldest = s.cache[s.startIndex%s.capacity].IndexRV + default: + return nil, errors.New("") + } + + if indexRV == 0 { + indexRV = versioner.Load() + } + if indexRV < oldest-1 { + return nil, errors.New("") + } + + f := func(i int) bool { + return s.cache[(s.startIndex+i)%s.capacity].IndexRV > indexRV + } + first := sort.Search(size, f) + indexerFunc := func(i int) *watchCacheEvent { + return s.cache[i%s.capacity] + } + indexValidator := func(i int) bool { + return i >= s.startIndex + } + ci := newCacheInterval(s.startIndex+first, s.endIndex, indexerFunc, indexValidator, s.watcherLock.RLocker()) + return ci, nil +} + +func (c *ResourceStorage) eventHandler(event *watchCacheEvent) { + c.incoming <- *event +} + +func (c *ResourceStorage) dispatchEvents() { + for { + select { + case event, ok := <-c.incoming: + if !ok { + return + } + c.dispatchEvent(&event) + case <-c.stopCh: + return + } + } +} + +func (c *ResourceStorage) dispatchEvent(event *watchCacheEvent) { + c.startDispatching(event) + defer c.finishDispatching() + + wcEvent := *event + event = &wcEvent + + c.blockedWatchers = c.blockedWatchers[:0] + for _, watcher := range c.watchersBuffer { + if !watcher.nonblockingAdd(event) { + c.blockedWatchers = append(c.blockedWatchers, watcher) + } + } + + if len(c.blockedWatchers) > 0 { + timeout := 50 * time.Millisecond + c.timer.Reset(timeout) + + timer := c.timer + for _, watcher := range c.blockedWatchers { + if !watcher.add(event, timer) { + timer = nil + } + } + if timer != nil && !timer.Stop() { + <-timer.C + } + } +} + +func (c *ResourceStorage) startDispatching(event *watchCacheEvent) { + c.watcherLock.Lock() + defer c.watcherLock.Unlock() + c.dispatching = true + c.watchersBuffer = c.watchersBuffer[:0] + + namespace := event.ObjFields["metadata.namespace"] + name := event.ObjFields["metadata.name"] + if len(namespace) > 0 { + if len(name) > 0 { + for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace, name: name}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } + } + for _, watcher := range c.watchers.allWatchers[namespacedName{namespace: namespace}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } + } + if len(name) > 0 { + for _, watcher := range c.watchers.allWatchers[namespacedName{name: name}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } + } + + for _, watcher := range c.watchers.allWatchers[namespacedName{}] { + c.watchersBuffer = append(c.watchersBuffer, watcher) + } +} + +func (c *ResourceStorage) finishDispatching() { + c.watcherLock.Lock() + defer c.watcherLock.Unlock() + c.dispatching = false + for _, watcher := range c.watchersToStop { + watcher.stopLocked() + } + + c.watchersToStop = c.watchersToStop[:0] +} + +func (c *ResourceStorage) stopWatcherLocked(watcher *cacheWatcher) { + if c.dispatching { + c.watchersToStop = append(c.watchersToStop, watcher) + } else { + watcher.stopLocked() + } +} + +func forgetWatcher(c *ResourceStorage, w *cacheWatcher, index int, scope namespacedName) func() { + return func() { + c.watcherLock.Lock() + defer c.watcherLock.Unlock() + c.watchers.deleteWatcher(index, scope) + c.stopWatcherLocked(w) + } +} + +const ( + eventFreshDuration = 75 * time.Second + + lowerBoundCapacity = 100 + upperBoundCapacity = 100 * 1024 +) + +func (s *ResourceStorage) resizeCacheLocked(eventTime time.Time) { + if s.isCacheFullLocked() && eventTime.Sub(s.cache[s.startIndex%s.capacity].RecordTime) < eventFreshDuration { + capacity := min(s.capacity*2, upperBoundCapacity) + if capacity > s.capacity { + s.doCacheResizeLocked(capacity) + } + return + } + if s.isCacheFullLocked() && eventTime.Sub(s.cache[(s.endIndex-s.capacity/4)%s.capacity].RecordTime) > eventFreshDuration { + capacity := max(s.capacity/2, lowerBoundCapacity) + if capacity < s.capacity { + s.doCacheResizeLocked(capacity) + } + return + } +} + +// isCacheFullLocked used to judge whether watchCacheEvent is full. +// Assumes that lock is already held for write. +func (s *ResourceStorage) isCacheFullLocked() bool { + return s.endIndex == s.startIndex+s.capacity +} + +// doCacheResizeLocked resize watchCache's event array with different capacity. +// Assumes that lock is already held for write. +func (s *ResourceStorage) doCacheResizeLocked(capacity int) { + newCache := make([]*watchCacheEvent, capacity) + if capacity < s.capacity { + // adjust startIndex if cache capacity shrink. + s.startIndex = s.endIndex - capacity + } + for i := s.startIndex; i < s.endIndex; i++ { + newCache[i%capacity] = s.cache[i%s.capacity] + } + s.cache = newCache + s.capacity = capacity +} + +type storeElement struct { + Key string + IndexRV uint64 + Object runtime.Object + Labels labels.Set + Annotations labels.Set +} + +func storeElementKey(obj interface{}) (string, error) { + elem, ok := obj.(*storeElement) + if !ok { + return "", fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Key, nil +} + +func storeElementObject(obj interface{}) (runtime.Object, error) { + elem, ok := obj.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", obj) + } + return elem.Object, nil +} + +func storeElementIndexFunc(objIndexFunc cache.IndexFunc) cache.IndexFunc { + return func(obj interface{}) (strings []string, e error) { + seo, err := storeElementObject(obj) + if err != nil { + return nil, err + } + return objIndexFunc(seo) + } +} + +func storeElementIndexers(indexers *cache.Indexers) cache.Indexers { + if indexers == nil { + return cache.Indexers{} + } + ret := cache.Indexers{} + for indexName, indexFunc := range *indexers { + ret[indexName] = storeElementIndexFunc(indexFunc) + } + return ret +} + +func getObjectAttrs(obj runtime.Object) (labels.Set, labels.Set, fields.Set, error) { + accessor, err := meta.Accessor(obj) + if err != nil { + return nil, nil, nil, err + } + + objLabels := accessor.GetLabels() + if objLabels == nil { + objLabels = make(map[string]string) + } + labelSet := labels.Set(objLabels) + + annotations := accessor.GetAnnotations() + if objLabels == nil { + objLabels = make(map[string]string) + } + annotationSet := labels.Set(annotations) + + return labelSet, annotationSet, nil, nil +} + +type watchCacheEvent struct { + Type watch.EventType + Object runtime.Object + + Key string + IndexRV uint64 + ResourceVersion string + + ObjLabels labels.Set + ObjFields fields.Set + + PrevObject runtime.Object + PrevObjLabels labels.Set + PrevObjFields fields.Set + + RecordTime time.Time +} diff --git a/pkg/storage/memorystorage/v2/storage.go b/pkg/storage/memorystorage/v2/storage.go new file mode 100644 index 000000000..9a3a75bee --- /dev/null +++ b/pkg/storage/memorystorage/v2/storage.go @@ -0,0 +1,39 @@ +package memorystorage + +import ( + "context" + + internal "github.com/clusterpedia-io/api/clusterpedia" + "github.com/clusterpedia-io/clusterpedia/pkg/storage" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type StorageFactory struct{} + +func (s *StorageFactory) NewResourceStorage(config *storage.ResourceStorageConfig) (storage.ResourceStorage, error) { + return nil, nil +} + +func (s *StorageFactory) GetSupportedRequestVerbs() []string { + return []string{"get", "list", "watch"} +} + +func (s *StorageFactory) NewCollectionResourceStorage(cr *internal.CollectionResource) (storage.CollectionResourceStorage, error) { + return nil, nil +} + +func (s *StorageFactory) GetResourceVersions(ctx context.Context, cluster string) (map[schema.GroupVersionResource]map[string]interface{}, error) { + return nil, nil +} + +func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error { + return nil +} + +func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster string, gvr schema.GroupVersionResource) error { + return nil +} + +func (s *StorageFactory) GetCollectionResources(ctx context.Context) ([]*internal.CollectionResource, error) { + return nil, nil +} diff --git a/pkg/storage/memorystorage/v2/versioner.go b/pkg/storage/memorystorage/v2/versioner.go new file mode 100644 index 000000000..36a325376 --- /dev/null +++ b/pkg/storage/memorystorage/v2/versioner.go @@ -0,0 +1,61 @@ +package memorystorage + +import ( + "fmt" + "os" + "strconv" + "strings" + "sync/atomic" + + "k8s.io/apimachinery/pkg/util/rand" +) + +var versioner atomic.Uint64 + +var prefix string + +func init() { + prefix = os.Getenv("MEMORY_STORAGE_RESOURCE_VERSION_PREFIX") + if prefix == "" { + prefix = rand.String(8) + } +} + +func ConvertResourceVersionToMemoryResourceVersion(rv string) (uint64, string) { + indexRV := versioner.Add(1) + return indexRV, fmt.Sprintf("%s.%d.%s", prefix, indexRV, rv) +} + +func ConvertMemoryResourceVersionToResourceVersion(rv string) (uint64, string, error) { + s := strings.Split(rv, ".") + if len(s) != 3 { + return 0, "", fmt.Errorf("invalid resource version: %s", s) + } + if s[0] != prefix { + return 0, "", fmt.Errorf("invalid resource version, prefix is not match: %s", s) + } + i, err := strconv.ParseUint(s[1], 10, 0) + if err != nil { + return 0, "", fmt.Errorf("invalid resource version, %v: %s", err, s) + } + return i, s[2], nil +} + +func ConvertToMemoryResourceVersionForList(indexRV uint64) string { + return fmt.Sprintf("%s.%d", prefix, indexRV) +} + +func ConvertMemoryResourceVersionToResourceVersionForList(rv string) (uint64, error) { + s := strings.Split(rv, ".") + if len(s) != 2 { + return 0, fmt.Errorf("invalid resource version: %s", s) + } + if s[0] != prefix { + return 0, fmt.Errorf("invalid resource version, prefix is not match: %s", s) + } + i, err := strconv.ParseUint(s[1], 10, 0) + if err != nil { + return 0, fmt.Errorf("invalid resource version, %v: %s", err, s) + } + return i, nil +} diff --git a/pkg/storage/memorystorage/v2/watch_cache_interval.go b/pkg/storage/memorystorage/v2/watch_cache_interval.go new file mode 100644 index 000000000..a554b7b53 --- /dev/null +++ b/pkg/storage/memorystorage/v2/watch_cache_interval.go @@ -0,0 +1,186 @@ +package memorystorage + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" +) + +// watchCacheInterval serves as an abstraction over a source +// of watchCacheEvents. It maintains a window of events over +// an underlying source and these events can be served using +// the exposed Next() API. The main intent for doing things +// this way is to introduce an upper bound of memory usage +// for starting a watch and reduce the maximum possible time +// interval for which the lock would be held while events are +// copied over. +// +// The source of events for the interval is typically either +// the watchCache circular buffer, if events being retrieved +// need to be for resource versions > 0 or the underlying +// implementation of Store, if resource version = 0. +// +// Furthermore, an interval can be either valid or invalid at +// any given point of time. The notion of validity makes sense +// only in cases where the window of events in the underlying +// source can change over time - i.e. for watchCache circular +// buffer. When the circular buffer is full and an event needs +// to be popped off, watchCache::startIndex is incremented. In +// this case, an interval tracking that popped event is valid +// only if it has already been copied to its internal buffer. +// However, for efficiency we perform that lazily and we mark +// an interval as invalid iff we need to copy events from the +// watchCache and we end up needing events that have already +// been popped off. This translates to the following condition: +// +// watchCacheInterval::startIndex >= watchCache::startIndex. +// +// When this condition becomes false, the interval is no longer +// valid and should not be used to retrieve and serve elements +// from the underlying source. +type watchCacheInterval struct { + // startIndex denotes the starting point of the interval + // being considered. The value is the index in the actual + // source of watchCacheEvents. If the source of events is + // the watchCache, then this must be used modulo capacity. + startIndex int + + // endIndex denotes the ending point of the interval being + // considered. The value is the index in the actual source + // of events. If the source of the events is the watchCache, + // then this should be used modulo capacity. + endIndex int + + // indexer is meant to inject behaviour for how an event must + // be retrieved from the underlying source given an index. + indexer indexerFunc + + // indexValidator is used to check if a given index is still + // valid perspective. If it is deemed that the index is not + // valid, then this interval can no longer be used to serve + // events. Use of indexValidator is warranted only in cases + // where the window of events in the underlying source can + // change over time. Furthermore, an interval is invalid if + // its startIndex no longer coincides with the startIndex of + // underlying source. + indexValidator indexValidator + + // buffer holds watchCacheEvents that this interval returns on + // a call to Next(). This exists mainly to reduce acquiring the + // lock on each invocation of Next(). + buffer *watchCacheIntervalBuffer + + // lock effectively protects access to the underlying source + // of events through - indexer and indexValidator. + // + // Given that indexer and indexValidator only read state, if + // possible, Locker obtained through RLocker() is provided. + lock sync.Locker +} + +type attrFunc func(runtime.Object) (labels.Set, fields.Set, error) +type indexerFunc func(int) *watchCacheEvent +type indexValidator func(int) bool + +func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval { + return &watchCacheInterval{ + startIndex: startIndex, + endIndex: endIndex, + indexer: indexer, + indexValidator: indexValidator, + buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, + lock: locker, + } +} + +type sortableWatchCacheEvents []*watchCacheEvent + +func (s sortableWatchCacheEvents) Len() int { + return len(s) +} + +func (s sortableWatchCacheEvents) Less(i, j int) bool { + return s[i].Key < s[j].Key +} + +func (s sortableWatchCacheEvents) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + +// Next returns the next item in the cache interval provided the cache +// interval is still valid. An error is returned if the interval is +// invalidated. +func (wci *watchCacheInterval) Next() (*watchCacheEvent, error) { + // if there are items in the buffer to return, return from + // the buffer. + if event, exists := wci.buffer.next(); exists { + return event, nil + } + // check if there are still other events in this interval + // that can be processed. + if wci.startIndex >= wci.endIndex { + return nil, nil + } + wci.lock.Lock() + defer wci.lock.Unlock() + + if valid := wci.indexValidator(wci.startIndex); !valid { + return nil, fmt.Errorf("cache interval invalidated, interval startIndex: %d", wci.startIndex) + } + + wci.fillBuffer() + if event, exists := wci.buffer.next(); exists { + return event, nil + } + return nil, nil +} + +func (wci *watchCacheInterval) fillBuffer() { + wci.buffer.startIndex = 0 + wci.buffer.endIndex = 0 + for wci.startIndex < wci.endIndex && !wci.buffer.isFull() { + event := wci.indexer(wci.startIndex) + if event == nil { + break + } + wci.buffer.buffer[wci.buffer.endIndex] = event + wci.buffer.endIndex++ + wci.startIndex++ + } +} + +const bufferSize = 100 + +// watchCacheIntervalBuffer is used to reduce acquiring +// the lock on each invocation of watchCacheInterval.Next(). +type watchCacheIntervalBuffer struct { + // buffer is used to hold watchCacheEvents that + // the interval returns on a call to Next(). + buffer []*watchCacheEvent + // The first element of buffer is defined by startIndex, + // its last element is defined by endIndex. + startIndex int + endIndex int +} + +// next returns the next event present in the interval buffer provided +// it is not empty. +func (wcib *watchCacheIntervalBuffer) next() (*watchCacheEvent, bool) { + if wcib.isEmpty() { + return nil, false + } + next := wcib.buffer[wcib.startIndex] + wcib.startIndex++ + return next, true +} + +func (wcib *watchCacheIntervalBuffer) isFull() bool { + return wcib.endIndex >= bufferSize +} + +func (wcib *watchCacheIntervalBuffer) isEmpty() bool { + return wcib.startIndex == wcib.endIndex +}