diff --git a/pkg/mappings/store/backend.go b/pkg/mappings/store/backend.go index df1199de69..b3609479c0 100644 --- a/pkg/mappings/store/backend.go +++ b/pkg/mappings/store/backend.go @@ -29,6 +29,7 @@ type BackendWatchEvent struct { type BackendWatchEventType string const ( - BackendWatchEventTypeUpdate BackendWatchEventType = "Update" - BackendWatchEventTypeDelete BackendWatchEventType = "Delete" + BackendWatchEventTypeUpdate BackendWatchEventType = "Update" + BackendWatchEventTypeDelete BackendWatchEventType = "Delete" + BackendWatchEventTypeDeleteReconstructed BackendWatchEventType = "DeleteReconstructed" ) diff --git a/pkg/mappings/store/etcd_backend.go b/pkg/mappings/store/backend_etcd.go similarity index 69% rename from pkg/mappings/store/etcd_backend.go rename to pkg/mappings/store/backend_etcd.go index 68206d994c..5a185659de 100644 --- a/pkg/mappings/store/etcd_backend.go +++ b/pkg/mappings/store/backend_etcd.go @@ -9,6 +9,9 @@ import ( "github.com/loft-sh/vcluster/pkg/etcd" "go.etcd.io/etcd/api/v3/mvccpb" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/klog/v2" ) @@ -56,8 +59,6 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse { responseChan <- BackendWatchResponse{ Err: event.Err(), } - case event.IsProgressNotify(): - klog.FromContext(ctx).V(1).Info("received progress notify from etcd") case len(event.Events) > 0: retEvents := make([]*BackendWatchEvent, 0, len(event.Events)) for _, singleEvent := range event.Events { @@ -88,10 +89,15 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse { "eventType", eventType, "error", err.Error(), ) - // FIXME(ThomasK33): This leads to mapping leaks. Etcd might have - // already compacted the previous version. Thus we would never - // receive any information of the mapping that was deleted apart from its keys. - // And because there is no mapping, we are omitting deleting it from the mapping stores. + + // we only send the reconstructed mapping to the consumer + if eventType == BackendWatchEventTypeDelete { + retEvents = append(retEvents, &BackendWatchEvent{ + Type: BackendWatchEventTypeDeleteReconstructed, + Mapping: reconstructNameMappingFromKey(string(singleEvent.Kv.Key)), + }) + } + continue } @@ -124,6 +130,43 @@ func (m *etcdBackend) Delete(ctx context.Context, mapping *Mapping) error { return m.etcdClient.Delete(ctx, mappingToKey(mapping)) } +func reconstructNameMappingFromKey(key string) *Mapping { + retMapping := &Mapping{} + trimmedKey := strings.TrimPrefix(key, mappingsPrefix) + splittedKey := strings.Split(trimmedKey, "/") + if splittedKey[0] == "v1" { + retMapping.GroupVersionKind = corev1.SchemeGroupVersion.WithKind(splittedKey[1]) + if len(splittedKey) == 4 { + retMapping.VirtualName = types.NamespacedName{ + Namespace: splittedKey[2], + Name: splittedKey[3], + } + } else { + retMapping.VirtualName = types.NamespacedName{ + Name: splittedKey[2], + } + } + } else { + retMapping.GroupVersionKind = schema.GroupVersionKind{ + Group: splittedKey[0], + Version: splittedKey[1], + Kind: splittedKey[2], + } + if len(splittedKey) == 5 { + retMapping.VirtualName = types.NamespacedName{ + Namespace: splittedKey[3], + Name: splittedKey[4], + } + } else { + retMapping.VirtualName = types.NamespacedName{ + Name: splittedKey[3], + } + } + } + + return retMapping +} + func mappingToKey(mapping *Mapping) string { nameNamespace := mapping.VirtualName.Name if mapping.VirtualName.Namespace != "" { diff --git a/pkg/mappings/store/memory_backend.go b/pkg/mappings/store/backend_memory.go similarity index 77% rename from pkg/mappings/store/memory_backend.go rename to pkg/mappings/store/backend_memory.go index a9589abc29..47120d8309 100644 --- a/pkg/mappings/store/memory_backend.go +++ b/pkg/mappings/store/backend_memory.go @@ -107,3 +107,29 @@ func (m *memoryBackend) Delete(_ context.Context, mapping *Mapping) error { return nil } + +func (m *memoryBackend) DeleteReconstructed(_ context.Context, mapping *Mapping) error { + m.m.Lock() + defer m.m.Unlock() + + delete(m.mappings, mapping.NameMapping) + for _, watchChan := range m.watches { + go func(watchChan chan BackendWatchResponse) { + watchChan <- BackendWatchResponse{ + Events: []*BackendWatchEvent{ + { + Type: BackendWatchEventTypeDeleteReconstructed, + Mapping: &Mapping{ + NameMapping: synccontext.NameMapping{ + GroupVersionKind: mapping.GroupVersionKind, + VirtualName: mapping.VirtualName, + }, + }, + }, + }, + } + }(watchChan) + } + + return nil +} diff --git a/pkg/mappings/store/store.go b/pkg/mappings/store/store.go index a8086573f3..4f92517d28 100644 --- a/pkg/mappings/store/store.go +++ b/pkg/mappings/store/store.go @@ -2,7 +2,6 @@ package store import ( "context" - "errors" "fmt" "sync" "time" @@ -11,7 +10,7 @@ import ( "github.com/loft-sh/vcluster/pkg/scheme" "github.com/loft-sh/vcluster/pkg/syncer/synccontext" kerrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" @@ -105,12 +104,6 @@ func (s *Store) StartGarbageCollection(ctx context.Context) { } func (s *Store) garbageCollectMappings(ctx context.Context) { - s.m.Lock() - defer s.m.Unlock() - - ctx, cancel := context.WithTimeoutCause(ctx, GarbageCollectionTimeout, errors.New("garbage collection timed out")) - defer cancel() - startTime := time.Now() klog.FromContext(ctx).V(1).Info( "start mappings garbage collection", @@ -125,46 +118,44 @@ func (s *Store) garbageCollectMappings(ctx context.Context) { ) }() - for _, mapping := range s.mappings { - select { - case <-ctx.Done(): - klog.FromContext(ctx).V(1).Info( - "exiting garbage collection early", - "err", ctx.Err(), - "marker", "gc", - ) - return - default: - klog.FromContext(ctx).V(1).Info( - "garbage collecting mapping", - "mapping", mapping.String(), + // copy mappings + s.m.Lock() + existingObjects := map[synccontext.NameMapping]bool{} + for nameMapping := range s.mappings { + existingObjects[nameMapping] = false + } + s.m.Unlock() + + // check if exists, this needs to be unlocked, as there are several + // calls to store within informer handlers that would otherwise deadlock + // the syncer if a garbage collection is ongoing + for nameMapping := range existingObjects { + // if object still exists we continue + if s.objectExists(ctx, nameMapping) { + continue + } + + // otherwise garbage collect mapping + err := s.garbageCollectMapping(ctx, nameMapping) + if err != nil { + klog.FromContext(ctx).Error( + err, + "garbage collect mapping", + "mapping", nameMapping.String(), "marker", "gc", ) - - err := s.garbageCollectMapping(ctx, mapping) - if err != nil { - klog.FromContext(ctx).Error( - err, - "garbage collect mapping", - "mapping", mapping.String(), - "marker", "gc", - ) - } } } } -func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) error { - klog.FromContext(ctx).V(1).Info( - "check object exists", - "name", mapping.NameMapping, - "marker", "gc", - ) - // check if object exists - exists, err := s.objectExists(ctx, mapping.NameMapping) - if err != nil { - return err - } else if exists { +func (s *Store) garbageCollectMapping(ctx context.Context, nameMapping synccontext.NameMapping) error { + // now delete those mappings whose objects are not found + s.m.Lock() + defer s.m.Unlock() + + // get mapping + mapping, ok := s.mappings[nameMapping] + if !ok { return nil } @@ -174,7 +165,7 @@ func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) err "marker", "gc", ) // delete the mapping - err = s.deleteMapping(ctx, mapping) + err := s.deleteMapping(ctx, mapping) if err != nil { return err } @@ -201,17 +192,30 @@ func (s *Store) deleteMapping(ctx context.Context, mapping *Mapping) error { return nil } -func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) (bool, error) { +func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) bool { // build the object we can query - _, err := scheme.Scheme.New(nameMapping.GroupVersionKind) + obj, err := scheme.Scheme.New(nameMapping.GroupVersionKind) if err != nil { if !runtime.IsNotRegisteredError(err) { - return false, fmt.Errorf("create object: %w", err) + klog.FromContext(ctx).Info( + "Error finding object type in schema", + "mapping", nameMapping.String(), + "err", err, + "marker", "gc", + ) + + return true } + + obj = &unstructured.Unstructured{} } - mObject := &metav1.PartialObjectMetadata{} - mObject.SetGroupVersionKind(nameMapping.GroupVersionKind) + // set kind & apiVersion if unstructured + uObject, ok := obj.(*unstructured.Unstructured) + if ok { + uObject.SetKind(nameMapping.GroupVersionKind.Kind) + uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String()) + } klog.FromContext(ctx).V(1).Info( "virtual get", @@ -220,9 +224,9 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa ) // check if virtual object exists - err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, mObject) + err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object)) if err == nil { - return true, nil + return true } else if !kerrors.IsNotFound(err) { // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. klog.FromContext(ctx).Info( @@ -238,7 +242,7 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa // In case of a transient error (server timeout or others) // the GC should be able to figure out that it doesn't exist // anymore on the next GC run. - return true, nil + return true } klog.FromContext(ctx).V(1).Info( @@ -248,9 +252,9 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa ) // check if host object exists - err = s.cachedHostClient.Get(ctx, nameMapping.HostName, mObject) + err = s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object)) if err == nil { - return true, nil + return true } else if !kerrors.IsNotFound(err) { // TODO: filter out other allowed errors here could be Forbidden, Type not found etc. klog.FromContext(ctx).Info( @@ -266,10 +270,10 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa // In case of a transient error (server timeout or others) // the GC should be able to figure out that it doesn't exist // anymore on the next GC run. - return true, nil + return true } - return false, nil + return false } func (s *Store) start(ctx context.Context) error { @@ -331,19 +335,20 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse } // verify mapping if needed - if s.verifyMapping != nil && !s.verifyMapping(event.Mapping.NameMapping) { + if event.Type == BackendWatchEventTypeUpdate && s.verifyMapping != nil && !s.verifyMapping(event.Mapping.NameMapping) { continue } klog.FromContext(ctx).V(1).Info("mapping store received event", "type", event.Type, "mapping", event.Mapping.String()) - // remove mapping in any case - oldMapping, ok := s.mappings[event.Mapping.NameMapping] + // remove mapping in any case, the mapping can be incomplete here for DeleteReconstructed events, + // so we need to find the mapping before deleting it. + oldMapping, ok := s.findMapping(event.Mapping.NameMapping) if ok { s.removeMapping(oldMapping) } - // re-add mapping if its an update + // re-add mapping if it's an update if event.Type == BackendWatchEventTypeUpdate { s.addMapping(event.Mapping) } diff --git a/pkg/mappings/store/store_test.go b/pkg/mappings/store/store_test.go index 19f0b8620c..ad96b7b9f4 100644 --- a/pkg/mappings/store/store_test.go +++ b/pkg/mappings/store/store_test.go @@ -188,6 +188,39 @@ func TestWatching(t *testing.T) { return len(store.mappings) == 0 && len(store.hostToVirtualName) == 0 && len(store.virtualToHostName) == 0 && len(store.referencesTo(podMapping.Virtual())) == 0, nil }) assert.NilError(t, err) + + // check save + err = backend.Save(ctx, &Mapping{ + NameMapping: secretMapping, + Sender: "doesnotexist", + References: []synccontext.NameMapping{ + podMapping, + }, + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second*3, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 1 && len(store.hostToVirtualName) == 2 && len(store.virtualToHostName) == 2 && len(store.referencesTo(podMapping.Virtual())) == 1, nil + }) + assert.NilError(t, err) + + // check delete + err = backend.DeleteReconstructed(ctx, &Mapping{ + NameMapping: secretMapping, + Sender: "doesnotexist", + }) + assert.NilError(t, err) + + // wait for event to arrive + err = wait.PollUntilContextTimeout(ctx, time.Millisecond*10, time.Second*3, true, func(_ context.Context) (bool, error) { + store.m.Lock() + defer store.m.Unlock() + return len(store.mappings) == 0 && len(store.hostToVirtualName) == 0 && len(store.virtualToHostName) == 0 && len(store.referencesTo(podMapping.Virtual())) == 0, nil + }) + assert.NilError(t, err) } func TestGarbageCollectMappings(t *testing.T) {