Skip to content

Commit

Permalink
wip: Added repro for deadlock issue
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas Kosiewski committed Oct 17, 2024
1 parent 1656136 commit ab31472
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 30 deletions.
4 changes: 2 additions & 2 deletions Dockerfile.release
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ARG KINE_VERSION="v0.13.1"
FROM rancher/kine:${KINE_VERSION} as kine
FROM rancher/kine:${KINE_VERSION} AS kine

# Build the manager binary
FROM alpine:3.20 as builder
FROM alpine:3.20 AS builder

WORKDIR /vcluster-dev

Expand Down
15 changes: 15 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -168,3 +168,18 @@ build-dev-image tag="":
cp dist/vcluster_linux_$(go env GOARCH | sed s/amd64/amd64_v1/g)/vcluster ./vcluster
docker build -t vcluster:dev-{{tag}} -f Dockerfile.release --build-arg TARGETARCH=$(uname -m) --build-arg TARGETOS=linux .
rm ./vcluster

run-conformance k8s_version="1.31.1" tag="conf": (build-dev-image tag)
minikube start --kubernetes-version {{ k8s_version }} --nodes=2
minikube addons enable metrics-server
minikube image load vcluster:dev-{{tag}}

vcluster create vcluster -n vcluster -f vcluster.yaml

sonobuoy run --mode=conformance-lite --level=debug

conformance-status:
sonobuoy status

conformance-logs:
sonobuoy logs
19 changes: 13 additions & 6 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package etcd
import (
"context"
"errors"
"fmt"

vconfig "github.com/loft-sh/vcluster/config"
"github.com/loft-sh/vcluster/pkg/config"
Expand Down Expand Up @@ -97,11 +98,11 @@ func New(ctx context.Context, certificates *Certificates, endpoints ...string) (
}

func (c *client) Watch(ctx context.Context, key string) clientv3.WatchChan {
return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV())
return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithProgressNotify())
}

func (c *client) List(ctx context.Context, key string) ([]Value, error) {
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix())
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0))
if err != nil {
return nil, err
}
Expand All @@ -118,9 +119,9 @@ func (c *client) List(ctx context.Context, key string) ([]Value, error) {
}

func (c *client) Get(ctx context.Context, key string) (Value, error) {
resp, err := c.c.Get(ctx, key)
resp, err := c.c.Get(ctx, key, clientv3.WithRev(0))
if err != nil {
return Value{}, err
return Value{}, fmt.Errorf("etcd get: %w", err)
}

if len(resp.Kvs) == 0 {
Expand Down Expand Up @@ -148,8 +149,14 @@ func (c *client) Get(ctx context.Context, key string) (Value, error) {
}

func (c *client) Put(ctx context.Context, key string, value []byte) error {
_, err := c.c.Put(ctx, key, string(value))
return err
_, err := c.Get(ctx, key)
if err == nil {
_, err := c.c.Put(ctx, key, string(value), clientv3.WithIgnoreLease())
return err
} else {

Check failure on line 156 in pkg/etcd/client.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
_, err := c.c.Put(ctx, key, string(value))
return err
}
}

func (c *client) Delete(ctx context.Context, key string) error {
Expand Down
5 changes: 5 additions & 0 deletions pkg/etcd/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func getClientConfig(ctx context.Context, certificates *Certificates, endpoints
DialTimeout: 5 * time.Second,

Logger: zap.L().Named("etcd-client"),

// DialOptions: []grpc.DialOption{
// grpc.WithDisableRetry(),
// },
// MaxUnaryRetries: 1,
}

if len(endpoints) > 0 {
Expand Down
5 changes: 2 additions & 3 deletions pkg/mappings/store/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ type Backend interface {
}

type BackendWatchResponse struct {
Err error
Events []*BackendWatchEvent

Err error
}

type BackendWatchEvent struct {
Type BackendWatchEventType
Mapping *Mapping
Type BackendWatchEventType
}

type BackendWatchEventType string
Expand Down
19 changes: 16 additions & 3 deletions pkg/mappings/store/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,14 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
defer close(responseChan)

for event := range watchChan {
if event.Canceled {
switch {
case event.Canceled:
responseChan <- BackendWatchResponse{
Err: event.Err(),
}
} else if len(event.Events) > 0 {
case event.IsProgressNotify():
klog.FromContext(ctx).V(1).Info("received progress notify from etcd")
case len(event.Events) > 0:
retEvents := make([]*BackendWatchEvent, 0, len(event.Events))
for _, singleEvent := range event.Events {
var eventType BackendWatchEventType
Expand All @@ -70,7 +73,13 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {

// parse mapping
retMapping := &Mapping{}
err := json.Unmarshal(singleEvent.Kv.Value, retMapping)

value := singleEvent.Kv.Value
if len(value) == 0 && singleEvent.Type == mvccpb.DELETE && singleEvent.PrevKv != nil {
value = singleEvent.PrevKv.Value
}

err := json.Unmarshal(value, retMapping)
if err != nil {
klog.FromContext(ctx).Info(
"etcd backend: Error decoding event",
Expand All @@ -79,6 +88,10 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
"eventType", eventType,
"error", err.Error(),
)
// FIXME(ThomasK33): This leads to mapping leaks. Etcd might have
// already compacted the previous version. Thus we would never
// receive any information of the mapping that was deleted apart from its keys.
// And because there is no mapping, we are ommitting deleting it from the mapping stores.

Check failure on line 94 in pkg/mappings/store/etcd_backend.go

View workflow job for this annotation

GitHub Actions / lint

`ommitting` is a misspelling of `omitting` (misspell)
continue
}

Expand Down
14 changes: 10 additions & 4 deletions pkg/mappings/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,13 @@ func (s *Store) start(ctx context.Context) error {
}

go func() {
wait.Until(func() {
wait.UntilWithContext(ctx, func(ctx context.Context) {
for watchEvent := range s.backend.Watch(ctx) {
s.handleEvent(ctx, watchEvent)
}

klog.FromContext(ctx).Info("mapping store watch has ended")
}, time.Second, ctx.Done())
}, time.Second)
}()

return nil
Expand All @@ -254,6 +254,12 @@ func (s *Store) handleEvent(ctx context.Context, watchEvent BackendWatchResponse
s.m.Lock()
defer s.m.Unlock()

klog.FromContext(ctx).V(1).Info(
"handling mapping store events",
"len", len(watchEvent.Events),
"err", watchEvent.Err,
)

if watchEvent.Err != nil {
klog.FromContext(ctx).Error(watchEvent.Err, "watch err in mappings store")
return
Expand Down Expand Up @@ -477,8 +483,8 @@ func (s *Store) DeleteMapping(ctx context.Context, nameMapping synccontext.NameM
}

func (s *Store) ReferencesTo(ctx context.Context, vObj synccontext.Object) []synccontext.NameMapping {
s.m.Lock()
defer s.m.Unlock()
s.m.RLock()
defer s.m.RUnlock()

retReferences := s.referencesTo(vObj)
klog.FromContext(ctx).V(1).Info("Found references for object", "object", vObj.String(), "references", len(retReferences))
Expand Down
20 changes: 8 additions & 12 deletions pkg/setup/controller_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/controllers/resources/nodes"
"github.com/loft-sh/vcluster/pkg/etcd"
"github.com/loft-sh/vcluster/pkg/mappings"
"github.com/loft-sh/vcluster/pkg/mappings/store"
"github.com/loft-sh/vcluster/pkg/mappings/store/verify"
Expand Down Expand Up @@ -362,10 +363,10 @@ func initControllerContext(
return nil, err
}

// etcdClient, err := etcd.NewFromConfig(ctx, vClusterOptions)
// if err != nil {
// return nil, fmt.Errorf("create etcd client: %w", err)
// }
etcdClient, err := etcd.NewFromConfig(ctx, vClusterOptions)
if err != nil {
return nil, fmt.Errorf("create etcd client: %w", err)
}

controllerContext := &synccontext.ControllerContext{
Context: ctx,
Expand All @@ -380,17 +381,12 @@ func initControllerContext(
Config: vClusterOptions,
}

// mappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewEtcdBackend(etcdClient), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping")))
// if err != nil {
// return nil, fmt.Errorf("start mapping store: %w", err)
// }

inMemoryMappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewMemoryBackend(), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping")))
mappingStore, err := store.NewStoreWithVerifyMapping(ctx, virtualManager.GetClient(), localManager.GetClient(), store.NewEtcdBackend(etcdClient), verify.NewVerifyMapping(controllerContext.ToRegisterContext().ToSyncContext("verify-mapping")))
if err != nil {
return nil, fmt.Errorf("start in-memory mapping store: %w", err)
return nil, fmt.Errorf("start mapping store: %w", err)
}

controllerContext.Mappings = mappings.NewMappingsRegistry(inMemoryMappingStore)
controllerContext.Mappings = mappings.NewMappingsRegistry(mappingStore)
return controllerContext, nil
}

Expand Down
59 changes: 59 additions & 0 deletions vcluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
controlPlane:
advanced:
virtualScheduler:
enabled: true
backingStore:
etcd:
deploy:
enabled: true
statefulSet:
image:
tag: 3.5.14-0
distro:
k8s:
apiServer:
extraArgs:
- --service-account-jwks-uri=https://kubernetes.default.svc.cluster.local/openid/v1/jwks
image:
tag: v1.31.1
controllerManager:
image:
tag: v1.31.1
enabled: true
scheduler:
image:
tag: v1.31.1
statefulSet:
scheduling:
podManagementPolicy: OrderedReady
image:
registry: ""
repository: "vcluster"
tag: "dev-conf"
env:
- name: DEBUG
value: "true"

networking:
advanced:
proxyKubelets:
byHostname: false
byIP: false

sync:
fromHost:
csiDrivers:
enabled: false
csiStorageCapacities:
enabled: false
nodes:
enabled: true
selector:
all: true
toHost:
persistentVolumes:
enabled: true
priorityClasses:
enabled: true
storageClasses:
enabled: true

0 comments on commit ab31472

Please sign in to comment.