Skip to content

Commit

Permalink
add memory storage layer v2
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <[email protected]>
  • Loading branch information
Iceber committed Nov 27, 2024
1 parent df2a249 commit 44280d6
Show file tree
Hide file tree
Showing 8 changed files with 1,096 additions and 0 deletions.
5 changes: 5 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ linters-settings:
- "-SA1019" # disable the rule SA1019(Using a deprecated function, variable, constant or field)
output:
sort-results: true

issues:
exclude-rules:
- path: pkg/storage/memorystorage/v2/
linters: unused # memory v2 is in alpha and tolerates unused functions or fields
28 changes: 28 additions & 0 deletions pkg/storage/memorystorage/v2/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Memory Storage Layer v2 (Alpha)
Due to significant updates and modifications required for the memory storage layer, version 2 (v2) has been introduced. Unless otherwise necessary, memory v1 will only be supported in Clusterpedia 0.x.

⚠️ The current memory v2 is in the alpha stage, has not undergone rigorous testing, and the foundational storage layer functionalities will be gradually improved and implemented.

This version draws inspiration from the [apiserver/storage/cacher](https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/apiserver/pkg/storage/cacher) library but does not necessarily follow all its design principles.

### Major Changes Compared to v1
#### ResourceVersion Format Changes
In v1, the resource version used a base64 encoded JSON format to merge the resource versions of each cluster into the resource's resource version field.

In v2, the resource version format is `<prefix>.<increase int>.<original resource version>`:
* An incrementing integer is used to represent the sequential order of resources, for version operations during List and Watch.
* The original resource version is retained.
* The prefix is used to identify the validity of the incrementing integer when requests switch between instances.

For Watch requests, a JSON formatted `ListOptions.ResourceVersion = {"<cluster>": "<resource version>"}` is supported to maintain continuity of Watch requests when switching instances.
> The clusterpedia version of Informer is required to replace the k8s.io/client-go Informer.
#### Using a Dedicated Resource Synchro
Due to the unique nature of the memory storage layer, it is unnecessary to use the default resource synchronizer of ClusterSynchro to maintain the informer store.

Memory v2 will directly use the native k8s informer as the resource synchronizer. Memory v2 will act as the Store for the k8s Informer, saving data directly into storage, avoiding intermediate operations and memory usage.

#### Supporting Dual Data Source Updates
In addition to the resource synchronizer saving resources in memory v2, external active additions, deletions, and modifications of memory v2 resources are also supported.

This ensures consistency of requests when supporting write operations at the apiserver layer through dual-write operations.
178 changes: 178 additions & 0 deletions pkg/storage/memorystorage/v2/cache_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package memorystorage

import (
"context"
"time"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
)

// cacheWatcher implements watch.Interface
type cacheWatcher struct {
input chan *watchCacheEvent
result chan watch.Event

filter filterWithAttrsFunc

stopped bool
done chan struct{}
forget func()
}

type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool

func newCacheWatcher(chanSize int, filter filterWithAttrsFunc) *cacheWatcher {
return &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
forget: func() {},
stopped: false,
}
}

// ResultChan implements watch.Interface.
func (c *cacheWatcher) ResultChan() <-chan watch.Event {
return c.result
}

// Stop implements watch.Interface.
func (c *cacheWatcher) Stop() {
c.forget()
}

func (c *cacheWatcher) stopLocked() {
if !c.stopped {
c.stopped = true
close(c.done)
close(c.input)
}
}

func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool {
select {
case c.input <- event:
return true
default:
return false
}
}

// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool {
// Try to send the event immediately, without blocking.
if c.nonblockingAdd(event) {
return true
}

closeFunc := func() {
c.forget()
}

if timer == nil {
closeFunc()
return false
}

select {
case c.input <- event:
return true
case <-timer.C:
closeFunc()
return false
}
}

func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields)
var oldObjPasses bool
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields)
}
if !curObjPasses && !oldObjPasses {
return nil
}

switch {
case curObjPasses && !oldObjPasses:
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
case curObjPasses && oldObjPasses:
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}

case !curObjPasses && oldObjPasses:
oldObj := event.PrevObject.DeepCopyObject()
return &watch.Event{Type: watch.Deleted, Object: oldObj}
}
return nil
}

func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
watchEvent := c.convertToWatchEvent(event)
if watchEvent == nil {
// Watcher is not interested in that object.
return
}

// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.

// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}

select {
case c.result <- *watchEvent:
case <-c.done:
}
}

func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, indexRV uint64) {
defer utilruntime.HandleCrash()

defer close(c.result)
defer c.Stop()

for {
event, err := cacheInterval.Next()
if err != nil {
return
}
if event == nil {
break
}
c.sendWatchCacheEvent(event)

if event.IndexRV > indexRV {
indexRV = event.IndexRV
}
}

for {
select {
case event, ok := <-c.input:
if !ok {
return
}
if event.IndexRV > indexRV {
c.sendWatchCacheEvent(event)
}
case <-ctx.Done():
return
}
}
}
18 changes: 18 additions & 0 deletions pkg/storage/memorystorage/v2/register.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package memorystorage

import (
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
)

const (
StorageName = "memory.v2/alpha"
)

func init() {
storage.RegisterStorageFactoryFunc(StorageName, NewStorageFactory)
}

func NewStorageFactory(_ string) (storage.StorageFactory, error) {
storageFactory := &StorageFactory{}
return storageFactory, nil
}
Loading

0 comments on commit 44280d6

Please sign in to comment.