Skip to content

Commit

Permalink
wip: Only get PartialObjectMetadata in GC
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Kosiewski committed Oct 18, 2024
1 parent ab31472 commit 58ee149
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 51 deletions.
6 changes: 3 additions & 3 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,10 @@ func (c *client) Put(ctx context.Context, key string, value []byte) error {
if err == nil {
_, err := c.c.Put(ctx, key, string(value), clientv3.WithIgnoreLease())
return err
} else {
_, err := c.c.Put(ctx, key, string(value))
return err
}

_, err = c.c.Put(ctx, key, string(value))
return err
}

func (c *client) Delete(ctx context.Context, key string) error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mappings/generic/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func RecordMapping(ctx *synccontext.SyncContext, pName, vName types.NamespacedNa
}

// record the reference
err := ctx.Mappings.Store().AddReference(ctx, synccontext.NameMapping{
err := ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{
GroupVersionKind: gvk,

HostName: pName,
Expand Down
2 changes: 1 addition & 1 deletion pkg/mappings/store/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
// FIXME(ThomasK33): This leads to mapping leaks. Etcd might have
// already compacted the previous version. Thus we would never
// receive any information of the mapping that was deleted apart from its keys.
// And because there is no mapping, we are ommitting deleting it from the mapping stores.
// And because there is no mapping, we are omitting deleting it from the mapping stores.
continue
}

Expand Down
150 changes: 105 additions & 45 deletions pkg/mappings/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package store

import (
"context"
"errors"
"fmt"
"sync"
"time"
Expand All @@ -10,18 +11,20 @@ import (
"github.com/loft-sh/vcluster/pkg/scheme"
"github.com/loft-sh/vcluster/pkg/syncer/synccontext"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"
)

const GarbageCollectionInterval = time.Minute * 3
const (
GarbageCollectionInterval = 3 * time.Minute
GarbageCollectionTimeout = 15 * time.Second
)

type VerifyMapping func(mapping synccontext.NameMapping) bool

Expand Down Expand Up @@ -97,31 +100,66 @@ func (s *Store) Watch(gvk schema.GroupVersionKind, addQueueFn synccontext.AddQue

func (s *Store) StartGarbageCollection(ctx context.Context) {
go func() {
wait.Until(func() {
s.garbageCollectMappings(ctx)
}, GarbageCollectionInterval, ctx.Done())
wait.UntilWithContext(ctx, s.garbageCollectMappings, GarbageCollectionInterval)
}()
}

func (s *Store) garbageCollectMappings(ctx context.Context) {
s.m.Lock()
defer s.m.Unlock()

ctx, cancel := context.WithTimeoutCause(ctx, GarbageCollectionTimeout, errors.New("garbage collection timed out"))
defer cancel()

startTime := time.Now()
klog.FromContext(ctx).V(1).Info("Start mappings garbage collection")
klog.FromContext(ctx).V(1).Info(
"start mappings garbage collection",
"mappings", len(s.mappings),
"marker", "gc",
)
defer func() {
klog.FromContext(ctx).V(1).Info("Garbage collection done", "took", time.Since(startTime).String())
klog.FromContext(ctx).V(1).Info(
"garbage collection done",
"took", time.Since(startTime).String(),
"marker", "gc",
)
}()

for _, mapping := range s.mappings {
err := s.garbageCollectMapping(ctx, mapping)
if err != nil {
klog.FromContext(ctx).Error(err, "Garbage collect mapping", "mapping", mapping.String())
select {
case <-ctx.Done():
klog.FromContext(ctx).V(1).Info(
"exiting garbage collection early",
"err", ctx.Err(),
"marker", "gc",
)
return
default:
klog.FromContext(ctx).V(1).Info(
"garbage collecting mapping",
"mapping", mapping.String(),
"marker", "gc",
)

err := s.garbageCollectMapping(ctx, mapping)
if err != nil {
klog.FromContext(ctx).Error(
err,
"garbage collect mapping",
"mapping", mapping.String(),
"marker", "gc",
)
}
}
}
}

func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) error {
klog.FromContext(ctx).V(1).Info(
"check object exists",
"name", mapping.NameMapping,
"marker", "gc",
)
// check if object exists
exists, err := s.objectExists(ctx, mapping.NameMapping)
if err != nil {
Expand All @@ -130,13 +168,22 @@ func (s *Store) garbageCollectMapping(ctx context.Context, mapping *Mapping) err
return nil
}

klog.FromContext(ctx).V(1).Info(
"delete mapping",
"name", mapping.NameMapping,
"marker", "gc",
)
// delete the mapping
err = s.deleteMapping(ctx, mapping)
if err != nil {
return err
}

klog.FromContext(ctx).Info("Remove mapping as both virtual and host were not found", "mapping", mapping.String())
klog.FromContext(ctx).Info(
"Remove mapping as both virtual and host were not found",
"mapping", mapping.String(),
"marker", "gc",
)
return nil
}

Expand All @@ -156,58 +203,71 @@ func (s *Store) deleteMapping(ctx context.Context, mapping *Mapping) error {

func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMapping) (bool, error) {
// build the object we can query
obj, err := scheme.Scheme.New(nameMapping.GroupVersionKind)
_, err := scheme.Scheme.New(nameMapping.GroupVersionKind)
if err != nil {
if !runtime.IsNotRegisteredError(err) {
return false, fmt.Errorf("create object: %w", err)
}

obj = &unstructured.Unstructured{}
}

// set kind & apiVersion if unstructured
uObject, ok := obj.(*unstructured.Unstructured)
if ok {
uObject.SetKind(nameMapping.Kind)
uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String())
}
mObject := &metav1.PartialObjectMetadata{}
mObject.SetGroupVersionKind(nameMapping.GroupVersionKind)

// check if virtual object exists
retriableErrors := func(env string) func(error) bool {
return func(err error) bool {
if kerrors.IsNotFound(err) {
return false
}

obj := obj.DeepCopyObject().(client.Object)
klog.FromContext(ctx).Info("failed to check if object exists in the "+env,
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
"err", err,
)
return kerrors.IsTimeout(err) || kerrors.IsInternalError(err) || kerrors.IsServerTimeout(err)
}
}
klog.FromContext(ctx).V(1).Info(
"virtual get",
"name", nameMapping.VirtualName,
"marker", "gc",
)

err = retry.OnError(retry.DefaultBackoff, retriableErrors("virtual cluster"), func() error {
return s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object))
})
// check if virtual object exists
err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, mObject)
if err == nil {
return true, nil
} else if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info("Error retrieving virtual object", "virtualObject", nameMapping.Virtual().String())
klog.FromContext(ctx).Info(
"Error retrieving virtual object",
"virtualObject", nameMapping.Virtual().String(),
"err", err,
"marker", "gc",
)

// (ThomasK33): If the error is a not found, we're going
// to assume that the object is still used.
//
// In case of a transient error (server timeout or others)
// the GC should be able to figure out that it doesn't exist
// anymore on the next GC run.
return true, nil
}

klog.FromContext(ctx).V(1).Info(
"host get",
"name", nameMapping.HostName,
"marker", "gc",
)

// check if host object exists
err = retry.OnError(retry.DefaultBackoff, retriableErrors("host cluster"), func() error {
return s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object))
})
err = s.cachedHostClient.Get(ctx, nameMapping.HostName, mObject)
if err == nil {
return true, nil
} else if !kerrors.IsNotFound(err) {
// TODO: filter out other allowed errors here could be Forbidden, Type not found etc.
klog.FromContext(ctx).Info("Error retrieving host object", "hostObject", nameMapping.Host().String())
klog.FromContext(ctx).Info(
"Error retrieving host object",
"hostObject", nameMapping.Host().String(),
"err", err,
"marker", "gc",
)

// (ThomasK33): If the error is a not found, we're going
// to assume that the object is still used.
//
// In case of a transient error (server timeout or others)
// the GC should be able to figure out that it doesn't exist
// anymore on the next GC run.
return true, nil

}

return false, nil
Expand Down
8 changes: 7 additions & 1 deletion pkg/setup/controller_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,13 @@ func initControllerContext(
Config: vClusterOptions,
}

mappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewEtcdBackend(etcdClient), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping")))
mappingStore, err := store.NewStoreWithVerifyMapping(
ctx,
virtualManager.GetClient(),
localManager.GetClient(),
store.NewEtcdBackend(etcdClient),
verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping")),
)
if err != nil {
return nil, fmt.Errorf("start mapping store: %w", err)
}
Expand Down

0 comments on commit 58ee149

Please sign in to comment.