Skip to content

Commit

Permalink
Merge pull request #2251 from FabianKramm/store-fixes
Browse files Browse the repository at this point in the history
fix: garbage collection deadlock & mappings memory leak
  • Loading branch information
FabianKramm authored Oct 30, 2024
2 parents 8815197 + a05bdf2 commit c907b9b
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 67 deletions.
5 changes: 3 additions & 2 deletions pkg/mappings/store/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type BackendWatchEvent struct {
type BackendWatchEventType string

const (
BackendWatchEventTypeUpdate BackendWatchEventType = "Update"
BackendWatchEventTypeDelete BackendWatchEventType = "Delete"
BackendWatchEventTypeUpdate BackendWatchEventType = "Update"
BackendWatchEventTypeDelete BackendWatchEventType = "Delete"
BackendWatchEventTypeDeleteReconstructed BackendWatchEventType = "DeleteReconstructed"
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

"github.com/loft-sh/vcluster/pkg/etcd"
"go.etcd.io/etcd/api/v3/mvccpb"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -56,8 +59,6 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
responseChan <- BackendWatchResponse{
Err: event.Err(),
}
case event.IsProgressNotify():
klog.FromContext(ctx).V(1).Info("received progress notify from etcd")
case len(event.Events) > 0:
retEvents := make([]*BackendWatchEvent, 0, len(event.Events))
for _, singleEvent := range event.Events {
Expand Down Expand Up @@ -88,10 +89,15 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
"eventType", eventType,
"error", err.Error(),
)
// FIXME(ThomasK33): This leads to mapping leaks. Etcd might have
// already compacted the previous version. Thus we would never
// receive any information of the mapping that was deleted apart from its keys.
// And because there is no mapping, we are omitting deleting it from the mapping stores.

// we only send the reconstructed mapping to the consumer
if eventType == BackendWatchEventTypeDelete {
retEvents = append(retEvents, &BackendWatchEvent{
Type: BackendWatchEventTypeDeleteReconstructed,
Mapping: reconstructNameMappingFromKey(string(singleEvent.Kv.Key)),
})
}

continue
}

Expand Down Expand Up @@ -124,6 +130,43 @@ func (m *etcdBackend) Delete(ctx context.Context, mapping *Mapping) error {
return m.etcdClient.Delete(ctx, mappingToKey(mapping))
}

func reconstructNameMappingFromKey(key string) *Mapping {
retMapping := &Mapping{}
trimmedKey := strings.TrimPrefix(key, mappingsPrefix)
splittedKey := strings.Split(trimmedKey, "/")
if splittedKey[0] == "v1" {
retMapping.GroupVersionKind = corev1.SchemeGroupVersion.WithKind(splittedKey[1])
if len(splittedKey) == 4 {
retMapping.VirtualName = types.NamespacedName{
Namespace: splittedKey[2],
Name: splittedKey[3],
}
} else {
retMapping.VirtualName = types.NamespacedName{
Name: splittedKey[2],
}
}
} else {
retMapping.GroupVersionKind = schema.GroupVersionKind{
Group: splittedKey[0],
Version: splittedKey[1],
Kind: splittedKey[2],
}
if len(splittedKey) == 5 {
retMapping.VirtualName = types.NamespacedName{
Namespace: splittedKey[3],
Name: splittedKey[4],
}
} else {
retMapping.VirtualName = types.NamespacedName{
Name: splittedKey[3],
}
}
}

return retMapping
}

func mappingToKey(mapping *Mapping) string {
nameNamespace := mapping.VirtualName.Name
if mapping.VirtualName.Namespace != "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,29 @@ func (m *memoryBackend) Delete(_ context.Context, mapping *Mapping) error {

return nil
}

func (m *memoryBackend) DeleteReconstructed(_ context.Context, mapping *Mapping) error {
m.m.Lock()
defer m.m.Unlock()

delete(m.mappings, mapping.NameMapping)
for _, watchChan := range m.watches {
go func(watchChan chan BackendWatchResponse) {
watchChan <- BackendWatchResponse{
Events: []*BackendWatchEvent{
{
Type: BackendWatchEventTypeDeleteReconstructed,
Mapping: &Mapping{
NameMapping: synccontext.NameMapping{
GroupVersionKind: mapping.GroupVersionKind,
VirtualName: mapping.VirtualName,
},
},
},
},
}
}(watchChan)
}

return nil
}
123 changes: 64 additions & 59 deletions pkg/mappings/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package store

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -11,7 +10,7 @@ import (
"github.com/loft-sh/vcluster/pkg/scheme"
"github.com/loft-sh/vcluster/pkg/syncer/synccontext"
kerrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -105,12 +104,6 @@ func (s *Store) StartGarbageCollection(ctx context.Context) {
}

func (s *Store) garbageCollectMappings(ctx context.Context) {
s.m.Lock()
defer s.m.Unlock()

ctx, cancel := context.WithTimeoutCause(ctx, GarbageCollectionTimeout, errors.New("garbage collection timed out"))
defer cancel()

startTime := time.Now()
klog.FromContext(ctx).V(1).Info(
"start mappings garbage collection",
Expand All @@ -125,46 +118,44 @@ func (s *Store) garbageCollectMappings(ctx context.Context) {
)
}()

for _, mapping := range s.mappings {
select {
case <-ctx.Done():
klog.FromContext(ctx).V(1).Info(
"exiting garbage collection early",
"err", ctx.Err(),
"marker", "gc",
)
return
default:
klog.FromContext(ctx).V(1).Info(
"garbage collecting mapping",
"mapping", mapping.String(),
// copy mappings
s.m.Lock()
existingObjects := map[synccontext.NameMapping]bool{}
for nameMapping := range s.mappings {
existingObjects[nameMapping] = false
}
s.m.Unlock()

// check if exists, this needs to be unlocked, as there are several
// calls to store within informer handlers that would otherwise deadlock
// the syncer if a garbage collection is ongoing
for nameMapping := range existingObjects {
// if object still exists we continue
if s.objectExists(ctx, nameMapping) {
continue
}

// otherwise garbage collect mapping
err := s.garbageCollectMapping(ctx, nameMapping)
if err != nil {
klog.FromContext(ctx).Error(
err,
"garbage collect mapping",
"mapping", nameMapping.String(),
"marker", "gc",
)

err := s.garbageCollectMapping(ctx, mapping)
if err != nil {
klog.FromContext(ctx).Error(
err,
"garbage collect mapping",
"mapping", mapping.String(),
"marker", "gc",
)
}
}
}
}

func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) error {
klog.FromContext(ctx).V(1).Info(
"check object exists",
"name", mapping.NameMapping,
"marker", "gc",
)
// check if object exists
exists, err := s.objectExists(ctx, mapping.NameMapping)
if err != nil {
return err
} else if exists {
func (s *Store) garbageCollectMapping(ctx context.Context, nameMapping synccontext.NameMapping) error {
// now delete those mappings whose objects are not found
s.m.Lock()
defer s.m.Unlock()

// get mapping
mapping, ok := s.mappings[nameMapping]
if !ok {
return nil
}

Expand All @@ -174,7 +165,7 @@ func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) err
"marker", "gc",
)
// delete the mapping
err = s.deleteMapping(ctx, mapping)
err := s.deleteMapping(ctx, mapping)
if err != nil {
return err
}
Expand All @@ -201,17 +192,30 @@ func (s *Store) deleteMapping(ctx context.Context, mapping *Mapping) error {
return nil
}

func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) (bool, error) {
func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) bool {
// build the object we can query
_, err := scheme.Scheme.New(nameMapping.GroupVersionKind)
obj, err := scheme.Scheme.New(nameMapping.GroupVersionKind)
if err != nil {
if !runtime.IsNotRegisteredError(err) {
return false, fmt.Errorf("create object: %w", err)
klog.FromContext(ctx).Info(
"Error finding object type in schema",
"mapping", nameMapping.String(),
"err", err,
"marker", "gc",
)

return true
}

obj = &unstructured.Unstructured{}
}

mObject := &metav1.PartialObjectMetadata{}
mObject.SetGroupVersionKind(nameMapping.GroupVersionKind)
// set kind & apiVersion if unstructured
uObject, ok := obj.(*unstructured.Unstructured)
if ok {
uObject.SetKind(nameMapping.GroupVersionKind.Kind)
uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String())
}

klog.FromContext(ctx).V(1).Info(
"virtual get",
Expand All @@ -220,9 +224,9 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa
)

// check if virtual object exists
err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, mObject)
err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object))
if err == nil {
return true, nil
return true
} else if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info(
Expand All @@ -238,7 +242,7 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa
// In case of a transient error (server timeout or others)
// the GC should be able to figure out that it doesn't exist
// anymore on the next GC run.
return true, nil
return true
}

klog.FromContext(ctx).V(1).Info(
Expand All @@ -248,9 +252,9 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa
)

// check if host object exists
err = s.cachedHostClient.Get(ctx, nameMapping.HostName, mObject)
err = s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object))
if err == nil {
return true, nil
return true
} else if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info(
Expand All @@ -266,10 +270,10 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa
// In case of a transient error (server timeout or others)
// the GC should be able to figure out that it doesn't exist
// anymore on the next GC run.
return true, nil
return true
}

return false, nil
return false
}

func (s *Store) start(ctx context.Context) error {
Expand Down Expand Up @@ -331,19 +335,20 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse
}

// verify mapping if needed
if s.verifyMapping != nil && !s.verifyMapping(event.Mapping.NameMapping) {
if event.Type == BackendWatchEventTypeUpdate && s.verifyMapping != nil && !s.verifyMapping(event.Mapping.NameMapping) {
continue
}

klog.FromContext(ctx).V(1).Info("mapping store received event", "type", event.Type, "mapping", event.Mapping.String())

// remove mapping in any case
oldMapping, ok := s.mappings[event.Mapping.NameMapping]
// remove mapping in any case, the mapping can be incomplete here for DeleteReconstructed events,
// so we need to find the mapping before deleting it.
oldMapping, ok := s.findMapping(event.Mapping.NameMapping)
if ok {
s.removeMapping(oldMapping)
}

// re-add mapping if its an update
// re-add mapping if it's an update
if event.Type == BackendWatchEventTypeUpdate {
s.addMapping(event.Mapping)
}
Expand Down
33 changes: 33 additions & 0 deletions pkg/mappings/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,39 @@ func TestWatching(t *testing.T) {
return len(store.mappings) == 0 && len(store.hostToVirtualName) == 0 && len(store.virtualToHostName) == 0 && len(store.referencesTo(podMapping.Virtual())) == 0, nil
})
assert.NilError(t, err)

// check save
err = backend.Save(ctx, &Mapping{
NameMapping: secretMapping,
Sender: "doesnotexist",
References: []synccontext.NameMapping{
podMapping,
},
})
assert.NilError(t, err)

// wait for event to arrive
err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second*3, true, func(_ context.Context) (bool, error) {
store.m.Lock()
defer store.m.Unlock()
return len(store.mappings) == 1 && len(store.hostToVirtualName) == 2 && len(store.virtualToHostName) == 2 && len(store.referencesTo(podMapping.Virtual())) == 1, nil
})
assert.NilError(t, err)

// check delete
err = backend.DeleteReconstructed(ctx, &Mapping{
NameMapping: secretMapping,
Sender: "doesnotexist",
})
assert.NilError(t, err)

// wait for event to arrive
err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second*3, true, func(_ context.Context) (bool, error) {
store.m.Lock()
defer store.m.Unlock()
return len(store.mappings) == 0 && len(store.hostToVirtualName) == 0 && len(store.virtualToHostName) == 0 && len(store.referencesTo(podMapping.Virtual())) == 0, nil
})
assert.NilError(t, err)
}

func TestGarbageCollectMappings(t *testing.T) {
Expand Down

0 comments on commit c907b9b

Please sign in to comment.