Skip to content

Commit

Permalink
wip(etcd): Turned delete into an unconditional delete
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Kosiewski <[email protected]>
  • Loading branch information
Thomas Kosiewski committed Oct 16, 2024
1 parent ce8c161 commit f451f40
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 96 deletions.
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ signs:
artifacts: checksum

snapshot:
name_template: "{{ incpatch .Version }}-next"
version_template: "{{ incpatch .Version }}-next"

changelog:
use: github
Expand Down
7 changes: 7 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,10 @@ gen-license-report:
go-licenses save --save_path=./licenses --ignore github.com/loft-sh ./...

cp -r ./licenses ./cmd/vclusterctl/cmd/credits

build-dev-image tag="":
TELEMETRY_PRIVATE_KEY="" goreleaser build --snapshot --clean

cp dist/vcluster_linux_$(go env GOARCH | sed s/amd64/amd64_v1/g)/vcluster ./vcluster
docker build -t vcluster:dev-{{tag}} -f Dockerfile.release --build-arg TARGETARCH=$(uname -m) --build-arg TARGETOS=linux .
rm ./vcluster
2 changes: 1 addition & 1 deletion cmd/vcluster/cmd/debug/etcd/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ExecuteKeys(ctx context.Context, options *KeysOptions) error {
}

// create new etcd backend & list mappings
keyValues, err := etcdClient.List(ctx, options.Prefix, 0)
keyValues, err := etcdClient.List(ctx, options.Prefix)
if err != nil {
return err
}
Expand Down
115 changes: 34 additions & 81 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,27 @@ package etcd
import (
"context"
"errors"
"fmt"

vconfig "github.com/loft-sh/vcluster/config"
"github.com/loft-sh/vcluster/pkg/config"
"github.com/loft-sh/vcluster/pkg/constants"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
)

type Value struct {
Key []byte
Data []byte
Revision int64
Key []byte
Data []byte
}

var (
ErrNotFound = errors.New("etcdwrapper: key not found")
)
var ErrNotFound = errors.New("etcdwrapper: key not found")

type Client interface {
List(ctx context.Context, key string, rev int) ([]Value, error)
Watch(ctx context.Context, key string, rev int) clientv3.WatchChan
List(ctx context.Context, key string) ([]Value, error)
Watch(ctx context.Context, key string) clientv3.WatchChan
Get(ctx context.Context, key string) (Value, error)
Put(ctx context.Context, key string, value []byte) error
Create(ctx context.Context, key string, value []byte) error
Update(ctx context.Context, key string, revision int64, value []byte) error
Delete(ctx context.Context, key string, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
Delete(ctx context.Context, key string) error
Close() error
}

Expand Down Expand Up @@ -102,22 +96,21 @@ func New(ctx context.Context, certificates *Certificates, endpoints ...string) (
}, nil
}

func (c *client) Watch(ctx context.Context, key string, rev int) clientv3.WatchChan {
return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev)))
func (c *client) Watch(ctx context.Context, key string) clientv3.WatchChan {
return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV())
}

func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error) {
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev)))
func (c *client) List(ctx context.Context, key string) ([]Value, error) {
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix())
if err != nil {
return nil, err
}

var vals []Value
for _, kv := range resp.Kvs {
vals = append(vals, Value{
Key: kv.Key,
Data: kv.Value,
Revision: kv.ModRevision,
Key: kv.Key,
Data: kv.Value,
})
}

Expand All @@ -130,78 +123,38 @@ func (c *client) Get(ctx context.Context, key string) (Value, error) {
return Value{}, err
}

if len(resp.Kvs) == 0 {
return Value{}, ErrNotFound
}

if len(resp.Kvs) == 1 {
return Value{
Key: resp.Kvs[0].Key,
Data: resp.Kvs[0].Value,
Revision: resp.Kvs[0].ModRevision,
Key: resp.Kvs[0].Key,
Data: resp.Kvs[0].Value,
}, nil
}

return Value{}, ErrNotFound
}

func (c *client) Put(ctx context.Context, key string, value []byte) error {
val, err := c.Get(ctx, key)
if err != nil && !errors.Is(err, ErrNotFound) {
return err
}
if val.Revision == 0 {
return c.Create(ctx, key, value)
}
return c.Update(ctx, key, val.Revision, value)
}

func (c *client) Create(ctx context.Context, key string, value []byte) error {
resp, err := c.c.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", 0)).
Then(clientv3.OpPut(key, string(value))).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("key exists")
highestRevision := &mvccpb.KeyValue{ModRevision: -1}
for _, kv := range resp.Kvs {
if kv.ModRevision > highestRevision.ModRevision {
highestRevision = kv
}
}
return nil
}

func (c *client) Update(ctx context.Context, key string, revision int64, value []byte) error {
resp, err := c.c.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpPut(key, string(value))).
Else(clientv3.OpGet(key)).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("revision %d doesnt match", revision)
}
return nil
return Value{
Key: highestRevision.Key,
Data: highestRevision.Value,
}, nil
}

func (c *client) Delete(ctx context.Context, key string, revision int64) error {
resp, err := c.c.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpDelete(key)).
Else(clientv3.OpGet(key)).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("revision %d doesnt match", revision)
}
return nil
func (c *client) Put(ctx context.Context, key string, value []byte) error {
_, err := c.c.Put(ctx, key, string(value))
return err
}

func (c *client) Compact(ctx context.Context, revision int64) (int64, error) {
resp, err := c.c.Compact(ctx, revision)
if resp != nil {
return resp.Header.GetRevision(), err
}
return 0, err
func (c *client) Delete(ctx context.Context, key string) error {
_, err := c.c.Delete(ctx, key)
return err
}

func (c *client) Close() error {
Expand Down
25 changes: 16 additions & 9 deletions pkg/mappings/store/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ type etcdBackend struct {
}

func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) {
mappings, err := m.etcdClient.List(ctx, mappingsPrefix, 0)
mappings, err := m.etcdClient.List(ctx, mappingsPrefix)
if err != nil {
return nil, fmt.Errorf("list mappings")
return nil, fmt.Errorf("etcd backend: list mappings: %w", err)
}

retMappings := make([]*Mapping, 0, len(mappings))
for _, kv := range mappings {
retMapping := &Mapping{}
err = json.Unmarshal(kv.Data, retMapping)
if err != nil {
return nil, fmt.Errorf("parse mapping %s: %w", string(kv.Key), err)
return nil, fmt.Errorf("etcd backend: parse mapping %s: %w", string(kv.Key), err)
}

retMappings = append(retMappings, retMapping)
Expand All @@ -46,7 +46,7 @@ func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) {

func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
responseChan := make(chan BackendWatchResponse)
watchChan := m.etcdClient.Watch(ctx, mappingsPrefix, 0)
watchChan := m.etcdClient.Watch(ctx, mappingsPrefix)
go func() {
defer close(responseChan)

Expand All @@ -59,19 +59,26 @@ func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
retEvents := make([]*BackendWatchEvent, 0, len(event.Events))
for _, singleEvent := range event.Events {
var eventType BackendWatchEventType
if singleEvent.Type == mvccpb.PUT {
switch singleEvent.Type {
case mvccpb.PUT:
eventType = BackendWatchEventTypeUpdate
} else if singleEvent.Type == mvccpb.DELETE {
case mvccpb.DELETE:
eventType = BackendWatchEventTypeDelete
} else {
default:
continue
}

// parse mapping
retMapping := &Mapping{}
err := json.Unmarshal(singleEvent.Kv.Value, retMapping)
if err != nil {
klog.FromContext(ctx).Info("Error decoding event", "key", string(singleEvent.Kv.Key), "error", err.Error())
klog.FromContext(ctx).Info(
"etcd backend: Error decoding event",
"key", string(singleEvent.Kv.Key),
"singleEventValue", string(singleEvent.Kv.Value),
"eventType", eventType,
"error", err.Error(),
)
continue
}

Expand Down Expand Up @@ -101,7 +108,7 @@ func (m *etcdBackend) Save(ctx context.Context, mapping *Mapping) error {
}

func (m *etcdBackend) Delete(ctx context.Context, mapping *Mapping) error {
return m.etcdClient.Delete(ctx, mappingToKey(mapping), 0)
return m.etcdClient.Delete(ctx, mappingToKey(mapping))
}

func mappingToKey(mapping *Mapping) string {
Expand Down
29 changes: 25 additions & 4 deletions pkg/mappings/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -167,12 +168,30 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa
// set kind & apiVersion if unstructured
uObject, ok := obj.(*unstructured.Unstructured)
if ok {
uObject.SetKind(nameMapping.GroupVersionKind.Kind)
uObject.SetKind(nameMapping.Kind)
uObject.SetAPIVersion(nameMapping.GroupVersionKind.GroupVersion().String())
}

// check if virtual object exists
err = s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object))
retriableErrors := func(env string) func(error) bool {
return func(err error) bool {
if kerrors.IsNotFound(err) {
return false
}

obj := obj.DeepCopyObject().(client.Object)
klog.FromContext(ctx).Info("failed to check if object exists in the "+env,
"namespace", obj.GetNamespace(),
"name", obj.GetName(),
"err", err,
)
return kerrors.IsTimeout(err) || kerrors.IsInternalError(err) || kerrors.IsServerTimeout(err)
}
}

err = retry.OnError(retry.DefaultBackoff, retriableErrors("virtual cluster"), func() error {
return s.cachedVirtualClient.Get(ctx, nameMapping.VirtualName, obj.DeepCopyObject().(client.Object))
})
if err == nil {
return true, nil
} else if !kerrors.IsNotFound(err) {
Expand All @@ -181,7 +200,9 @@ func (s *Store) objectExists(ctx context.Context, nameMapping synccontext.NameMa
}

// check if host object exists
err = s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object))
err = retry.OnError(retry.DefaultBackoff, retriableErrors("host cluster"), func() error {
return s.cachedHostClient.Get(ctx, nameMapping.HostName, obj.DeepCopyObject().(client.Object))
})
if err == nil {
return true, nil
} else if !kerrors.IsNotFound(err) {
Expand Down Expand Up @@ -377,7 +398,7 @@ func (s *Store) AddReference(ctx context.Context, nameMapping, belongsTo synccon
}

// check if we need to add mapping
if mapping.NameMapping.Equals(nameMapping) {
if mapping.Equals(nameMapping) {
return nil
}

Expand Down

0 comments on commit f451f40

Please sign in to comment.