-
Notifications
You must be signed in to change notification settings - Fork 120
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Iceber Gu <[email protected]>
- Loading branch information
Showing
7 changed files
with
1,099 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
# Memory Storage Layer v2 (Alpha) | ||
Due to significant updates and modifications required for the memory storage layer, version 2 (v2) has been introduced. Unless otherwise necessary, memory v1 will only be supported in Clusterpedia 0.x. | ||
|
||
⚠️ The current memory v2 is in the alpha stage, has not undergone rigorous testing, and the foundational storage layer functionalities will be gradually improved and implemented. | ||
|
||
This version draws inspiration from the [apiserver/storage/cacher](https://github.com/kubernetes/kubernetes/tree/master/staging/src/k8s.io/apiserver/pkg/storage/cacher) library but does not necessarily follow all its design principles. | ||
|
||
### Major Changes Compared to v1 | ||
#### ResourceVersion Format Changes | ||
In v1, the resource version used a base64 encoded JSON format to merge the resource versions of each cluster into the resource's resource version field. | ||
|
||
In v2, the resource version format is `<prefix>.<increase int>.<original resource version>`: | ||
* An incrementing integer is used to represent the sequential order of resources, for version operations during List and Watch. | ||
* The original resource version is retained. | ||
* The prefix is used to identify the validity of the incrementing integer when requests switch between instances. | ||
|
||
For Watch requests, a JSON formatted `ListOptions.ResourceVersion = {"<cluster>": "<resource version>"}` is supported to maintain continuity of Watch requests when switching instances. | ||
> The clusterpedia version of Informer is required to replace the k8s.io/client-go Informer. | ||
#### Using a Dedicated Resource Synchro | ||
Due to the unique nature of the memory storage layer, it is unnecessary to use the default resource synchronizer of ClusterSynchro to maintain the informer store. | ||
|
||
Memory v2 will directly use the native k8s informer as the resource synchronizer. Memory v2 will act as the Store for the k8s Informer, saving data directly into storage, avoiding intermediate operations and memory usage. | ||
|
||
#### Supporting Dual Data Source Updates | ||
In addition to the resource synchronizer saving resources in memory v2, external active additions, deletions, and modifications of memory v2 resources are also supported. | ||
|
||
This ensures consistency of requests when supporting write operations at the apiserver layer through dual-write operations. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
package memorystorage | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"k8s.io/apimachinery/pkg/fields" | ||
"k8s.io/apimachinery/pkg/labels" | ||
utilruntime "k8s.io/apimachinery/pkg/util/runtime" | ||
"k8s.io/apimachinery/pkg/watch" | ||
) | ||
|
||
// cacheWatcher implements watch.Interface | ||
type cacheWatcher struct { | ||
input chan *watchCacheEvent | ||
result chan watch.Event | ||
|
||
filter filterWithAttrsFunc | ||
|
||
stopped bool | ||
done chan struct{} | ||
forget func() | ||
} | ||
|
||
type filterWithAttrsFunc func(key string, l labels.Set, f fields.Set) bool | ||
|
||
func newCacheWatcher(chanSize int, filter filterWithAttrsFunc) *cacheWatcher { | ||
return &cacheWatcher{ | ||
input: make(chan *watchCacheEvent, chanSize), | ||
result: make(chan watch.Event, chanSize), | ||
done: make(chan struct{}), | ||
filter: filter, | ||
forget: func() {}, | ||
stopped: false, | ||
} | ||
} | ||
|
||
// ResultChan implements watch.Interface. | ||
func (c *cacheWatcher) ResultChan() <-chan watch.Event { | ||
return c.result | ||
} | ||
|
||
// Stop implements watch.Interface. | ||
func (c *cacheWatcher) Stop() { | ||
c.forget() | ||
} | ||
|
||
func (c *cacheWatcher) stopLocked() { | ||
if !c.stopped { | ||
c.stopped = true | ||
close(c.done) | ||
close(c.input) | ||
} | ||
} | ||
|
||
func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { | ||
Check failure on line 56 in pkg/storage/memorystorage/v2/cache_watcher.go GitHub Actions / Lint with golangci-lint
|
||
select { | ||
case c.input <- event: | ||
return true | ||
default: | ||
return false | ||
} | ||
} | ||
|
||
// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher) | ||
func (c *cacheWatcher) add(event *watchCacheEvent, timer *time.Timer) bool { | ||
Check failure on line 66 in pkg/storage/memorystorage/v2/cache_watcher.go GitHub Actions / Lint with golangci-lint
|
||
// Try to send the event immediately, without blocking. | ||
if c.nonblockingAdd(event) { | ||
return true | ||
} | ||
|
||
closeFunc := func() { | ||
c.forget() | ||
} | ||
|
||
if timer == nil { | ||
closeFunc() | ||
return false | ||
} | ||
|
||
select { | ||
case c.input <- event: | ||
return true | ||
case <-timer.C: | ||
closeFunc() | ||
return false | ||
} | ||
} | ||
|
||
func (c *cacheWatcher) convertToWatchEvent(event *watchCacheEvent) *watch.Event { | ||
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields) | ||
var oldObjPasses bool | ||
if event.PrevObject != nil { | ||
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields) | ||
} | ||
if !curObjPasses && !oldObjPasses { | ||
return nil | ||
} | ||
|
||
switch { | ||
case curObjPasses && !oldObjPasses: | ||
return &watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()} | ||
case curObjPasses && oldObjPasses: | ||
return &watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()} | ||
|
||
case !curObjPasses && oldObjPasses: | ||
oldObj := event.PrevObject.DeepCopyObject() | ||
return &watch.Event{Type: watch.Deleted, Object: oldObj} | ||
} | ||
return nil | ||
} | ||
|
||
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { | ||
watchEvent := c.convertToWatchEvent(event) | ||
if watchEvent == nil { | ||
// Watcher is not interested in that object. | ||
return | ||
} | ||
|
||
// We need to ensure that if we put event X to the c.result, all | ||
// previous events were already put into it before, no matter whether | ||
// c.done is close or not. | ||
// Thus we cannot simply select from c.done and c.result and this | ||
// would give us non-determinism. | ||
// At the same time, we don't want to block infinitely on putting | ||
// to c.result, when c.done is already closed. | ||
|
||
// This ensures that with c.done already close, we at most once go | ||
// into the next select after this. With that, no matter which | ||
// statement we choose there, we will deliver only consecutive | ||
// events. | ||
select { | ||
case <-c.done: | ||
return | ||
default: | ||
} | ||
|
||
select { | ||
case c.result <- *watchEvent: | ||
case <-c.done: | ||
} | ||
} | ||
|
||
func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watchCacheInterval, indexRV uint64) { | ||
defer utilruntime.HandleCrash() | ||
|
||
defer close(c.result) | ||
defer c.Stop() | ||
|
||
for { | ||
event, err := cacheInterval.Next() | ||
if err != nil { | ||
return | ||
} | ||
if event == nil { | ||
break | ||
} | ||
c.sendWatchCacheEvent(event) | ||
|
||
if event.IndexRV > indexRV { | ||
indexRV = event.IndexRV | ||
} | ||
} | ||
|
||
for { | ||
select { | ||
case event, ok := <-c.input: | ||
if !ok { | ||
return | ||
} | ||
if event.IndexRV > indexRV { | ||
c.sendWatchCacheEvent(event) | ||
} | ||
case <-ctx.Done(): | ||
return | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
package memorystorage | ||
|
||
import ( | ||
"github.com/clusterpedia-io/clusterpedia/pkg/storage" | ||
) | ||
|
||
const ( | ||
StorageName = "memory.v2/alpha" | ||
) | ||
|
||
func init() { | ||
storage.RegisterStorageFactoryFunc(StorageName, NewStorageFactory) | ||
} | ||
|
||
func NewStorageFactory(_ string) (storage.StorageFactory, error) { | ||
storageFactory := &StorageFactory{} | ||
return storageFactory, nil | ||
} |
Oops, something went wrong.