From ab31472ce6966d903cc6cd1f5cd5a8d4a137946e Mon Sep 17 00:00:00 2001 From: Thomas Kosiewski Date: Thu, 17 Oct 2024 15:52:59 +0200 Subject: [PATCH] wip: Added repro for deadlock issue --- Dockerfile.release | 4 +- Justfile | 15 ++++++++ pkg/etcd/client.go | 19 +++++++--- pkg/etcd/util.go | 5 +++ pkg/mappings/store/backend.go | 5 +-- pkg/mappings/store/etcd_backend.go | 19 ++++++++-- pkg/mappings/store/store.go | 14 +++++-- pkg/setup/controller_context.go | 20 ++++------ vcluster.yaml | 59 ++++++++++++++++++++++++++++++ 9 files changed, 130 insertions(+), 30 deletions(-) create mode 100644 vcluster.yaml diff --git a/Dockerfile.release b/Dockerfile.release index 12ffe7d03..be38a0d29 100644 --- a/Dockerfile.release +++ b/Dockerfile.release @@ -1,8 +1,8 @@ ARG KINE_VERSION="v0.13.1" -FROM rancher/kine:${KINE_VERSION} as kine +FROM rancher/kine:${KINE_VERSION} AS kine # Build the manager binary -FROM alpine:3.20 as builder +FROM alpine:3.20 AS builder WORKDIR /vcluster-dev diff --git a/Justfile b/Justfile index 191e1c485..40cf4c3fc 100644 --- a/Justfile +++ b/Justfile @@ -168,3 +168,18 @@ build-dev-image tag="": cp dist/vcluster_linux_$(go env GOARCH | sed s/amd64/amd64_v1/g)/vcluster ./vcluster docker build -t vcluster:dev-{{tag}} -f Dockerfile.release --build-arg TARGETARCH=$(uname -m) --build-arg TARGETOS=linux . rm ./vcluster + +run-conformance k8s_version="1.31.1" tag="conf": (build-dev-image tag) + minikube start --kubernetes-version {{ k8s_version }} --nodes=2 + minikube addons enable metrics-server + minikube image load vcluster:dev-{{tag}} + + vcluster create vcluster -n vcluster -f vcluster.yaml + + sonobuoy run --mode=conformance-lite --level=debug + +conformance-status: + sonobuoy status + +conformance-logs: + sonobuoy logs diff --git a/pkg/etcd/client.go b/pkg/etcd/client.go index 1ec73aadc..32df4cd9b 100644 --- a/pkg/etcd/client.go +++ b/pkg/etcd/client.go @@ -3,6 +3,7 @@ package etcd import ( "context" "errors" + "fmt" vconfig "github.com/loft-sh/vcluster/config" "github.com/loft-sh/vcluster/pkg/config" @@ -97,11 +98,11 @@ func New(ctx context.Context, certificates *Certificates, endpoints ...string) ( } func (c *client) Watch(ctx context.Context, key string) clientv3.WatchChan { - return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV()) + return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithProgressNotify()) } func (c *client) List(ctx context.Context, key string) ([]Value, error) { - resp, err := c.c.Get(ctx, key, clientv3.WithPrefix()) + resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0)) if err != nil { return nil, err } @@ -118,9 +119,9 @@ func (c *client) List(ctx context.Context, key string) ([]Value, error) { } func (c *client) Get(ctx context.Context, key string) (Value, error) { - resp, err := c.c.Get(ctx, key) + resp, err := c.c.Get(ctx, key, clientv3.WithRev(0)) if err != nil { - return Value{}, err + return Value{}, fmt.Errorf("etcd get: %w", err) } if len(resp.Kvs) == 0 { @@ -148,8 +149,14 @@ func (c *client) Get(ctx context.Context, key string) (Value, error) { } func (c *client) Put(ctx context.Context, key string, value []byte) error { - _, err := c.c.Put(ctx, key, string(value)) - return err + _, err := c.Get(ctx, key) + if err == nil { + _, err := c.c.Put(ctx, key, string(value), clientv3.WithIgnoreLease()) + return err + } else { + _, err := c.c.Put(ctx, key, string(value)) + return err + } } func (c *client) Delete(ctx context.Context, key string) error { diff --git a/pkg/etcd/util.go b/pkg/etcd/util.go index 9440ad29f..bdf03fa48 100644 --- a/pkg/etcd/util.go +++ b/pkg/etcd/util.go @@ -72,6 +72,11 @@ func getClientConfig(ctx context.Context, certificates *Certificates, endpoints DialTimeout: 5 * time.Second, Logger: zap.L().Named("etcd-client"), + + // DialOptions: []grpc.DialOption{ + // grpc.WithDisableRetry(), + // }, + // MaxUnaryRetries: 1, } if len(endpoints) > 0 { diff --git a/pkg/mappings/store/backend.go b/pkg/mappings/store/backend.go index ac35fea75..df1199de6 100644 --- a/pkg/mappings/store/backend.go +++ b/pkg/mappings/store/backend.go @@ -17,14 +17,13 @@ type Backend interface { } type BackendWatchResponse struct { + Err error Events []*BackendWatchEvent - - Err error } type BackendWatchEvent struct { - Type BackendWatchEventType Mapping *Mapping + Type BackendWatchEventType } type BackendWatchEventType string diff --git a/pkg/mappings/store/etcd_backend.go b/pkg/mappings/store/etcd_backend.go index 9a7e1238b..41d3bfb38 100644 --- a/pkg/mappings/store/etcd_backend.go +++ b/pkg/mappings/store/etcd_backend.go @@ -51,11 +51,14 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse { defer close(responseChan) for event := range watchChan { - if event.Canceled { + switch { + case event.Canceled: responseChan <- BackendWatchResponse{ Err: event.Err(), } - } else if len(event.Events) > 0 { + case event.IsProgressNotify(): + klog.FromContext(ctx).V(1).Info("received progress notify from etcd") + case len(event.Events) > 0: retEvents := make([]*BackendWatchEvent, 0, len(event.Events)) for _, singleEvent := range event.Events { var eventType BackendWatchEventType @@ -70,7 +73,13 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse { // parse mapping retMapping := &Mapping{} - err := json.Unmarshal(singleEvent.Kv.Value, retMapping) + + value := singleEvent.Kv.Value + if len(value) == 0 && singleEvent.Type == mvccpb.DELETE && singleEvent.PrevKv != nil { + value = singleEvent.PrevKv.Value + } + + err := json.Unmarshal(value, retMapping) if err != nil { klog.FromContext(ctx).Info( "etcd backend: Error decoding event", @@ -79,6 +88,10 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse { "eventType", eventType, "error", err.Error(), ) + // FIXME(ThomasK33): This leads to mapping leaks. Etcd might have + // already compacted the previous version. Thus we would never + // receive any information of the mapping that was deleted apart from its keys. + // And because there is no mapping, we are ommitting deleting it from the mapping stores. continue } diff --git a/pkg/mappings/store/store.go b/pkg/mappings/store/store.go index 16a5e6211..3dc2cdfdc 100644 --- a/pkg/mappings/store/store.go +++ b/pkg/mappings/store/store.go @@ -238,13 +238,13 @@ func (s *Store) start(ctx context.Context) error { } go func() { - wait.Until(func() { + wait.UntilWithContext(ctx, func(ctx context.Context) { for watchEvent := range s.backend.Watch(ctx) { s.handleEvent(ctx, watchEvent) } klog.FromContext(ctx).Info("mapping store watch has ended") - }, time.Second, ctx.Done()) + }, time.Second) }() return nil @@ -254,6 +254,12 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse s.m.Lock() defer s.m.Unlock() + klog.FromContext(ctx).V(1).Info( + "handling mapping store events", + "len", len(watchEvent.Events), + "err", watchEvent.Err, + ) + if watchEvent.Err != nil { klog.FromContext(ctx).Error(watchEvent.Err, "watch err in mappings store") return @@ -477,8 +483,8 @@ func (s *Store) DeleteMapping(ctx context.Context, nameMapping synccontext.NameM } func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping { - s.m.Lock() - defer s.m.Unlock() + s.m.RLock() + defer s.m.RUnlock() retReferences := s.referencesTo(vObj) klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences)) diff --git a/pkg/setup/controller_context.go b/pkg/setup/controller_context.go index dd3b1d70d..3ec0a213b 100644 --- a/pkg/setup/controller_context.go +++ b/pkg/setup/controller_context.go @@ -8,6 +8,7 @@ import ( "github.com/loft-sh/vcluster/pkg/config" "github.com/loft-sh/vcluster/pkg/controllers/resources/nodes" + "github.com/loft-sh/vcluster/pkg/etcd" "github.com/loft-sh/vcluster/pkg/mappings" "github.com/loft-sh/vcluster/pkg/mappings/store" "github.com/loft-sh/vcluster/pkg/mappings/store/verify" @@ -362,10 +363,10 @@ func initControllerContext( return nil, err } - // etcdClient, err := etcd.NewFromConfig(ctx, vClusterOptions) - // if err != nil { - // return nil, fmt.Errorf("create etcd client: %w", err) - // } + etcdClient, err := etcd.NewFromConfig(ctx, vClusterOptions) + if err != nil { + return nil, fmt.Errorf("create etcd client: %w", err) + } controllerContext := &synccontext.ControllerContext{ Context: ctx, @@ -380,17 +381,12 @@ func initControllerContext( Config: vClusterOptions, } - // mappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewEtcdBackend(etcdClient), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping"))) - // if err != nil { - // return nil, fmt.Errorf("start mapping store: %w", err) - // } - - inMemoryMappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewMemoryBackend(), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping"))) + mappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewEtcdBackend(etcdClient), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping"))) if err != nil { - return nil, fmt.Errorf("start in-memory mapping store: %w", err) + return nil, fmt.Errorf("start mapping store: %w", err) } - controllerContext.Mappings = mappings.NewMappingsRegistry(inMemoryMappingStore) + controllerContext.Mappings = mappings.NewMappingsRegistry(mappingStore) return controllerContext, nil } diff --git a/vcluster.yaml b/vcluster.yaml new file mode 100644 index 000000000..71cd495b7 --- /dev/null +++ b/vcluster.yaml @@ -0,0 +1,59 @@ +controlPlane: + advanced: + virtualScheduler: + enabled: true + backingStore: + etcd: + deploy: + enabled: true + statefulSet: + image: + tag: 3.5.14-0 + distro: + k8s: + apiServer: + extraArgs: + - --service-account-jwks-uri=https://kubernetes.default.svc.cluster.local/openid/v1/jwks + image: + tag: v1.31.1 + controllerManager: + image: + tag: v1.31.1 + enabled: true + scheduler: + image: + tag: v1.31.1 + statefulSet: + scheduling: + podManagementPolicy: OrderedReady + image: + registry: "" + repository: "vcluster" + tag: "dev-conf" + env: + - name: DEBUG + value: "true" + +networking: + advanced: + proxyKubelets: + byHostname: false + byIP: false + +sync: + fromHost: + csiDrivers: + enabled: false + csiStorageCapacities: + enabled: false + nodes: + enabled: true + selector: + all: true + toHost: + persistentVolumes: + enabled: true + priorityClasses: + enabled: true + storageClasses: + enabled: true