Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repair Natsjskv Registry #9620

Merged
merged 2 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-natsjskv-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Bugfix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.

https://github.com/owncloud/ocis/pull/9620
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,8 @@ replace github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-2

replace github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c

replace github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf

// exclude the v2 line of go-sqlite3 which was released accidentally and prevents pulling in newer versions of go-sqlite3
// see https://github.com/mattn/go-sqlite3/issues/965 for more details
exclude github.com/mattn/go-sqlite3 v2.0.3+incompatible
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1218,8 +1218,6 @@ github.com/go-micro/plugins/v4/server/http v1.2.2 h1:UK2/09AU0zV3wHELuR72TZzVU2v
github.com/go-micro/plugins/v4/server/http v1.2.2/go.mod h1:YuAjaSPxcn3LI8j2FUsqx0Rxunrj4YwDV41Ax76rLl0=
github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0 h1:Qa1EBQ9UyCGecFAJQovl/MHGnvbcvDaM3qUoAG5Lnvk=
github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0/go.mod h1:aCRl8JQmqIaonOl88nFPY/BOQnHPVHY9ngStzLkXnYk=
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e h1:hwH0qXT0J3UFYRi0UD+e3ItL92oW+jdPFA+3o/j6ASg=
github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e/go.mod h1:Goi4eJ9SrKkxE6NsAVqBVNxfQFbwb7UbyII6743ldgM=
github.com/go-micro/plugins/v4/store/redis v1.2.1 h1:d9kwr9bSpoK9vkHkqcv+isQUbgBCHpfwCV57pcAPS6c=
github.com/go-micro/plugins/v4/store/redis v1.2.1/go.mod h1:MbCG0YiyPqETTtm7uHFmxQNCaW1o9hBoYtFwhbVjLUg=
github.com/go-micro/plugins/v4/transport/grpc v1.1.0 h1:mXfDYfFQLnVDzjGY3o84oe4prfux9h8txsnA19dKsj8=
Expand Down Expand Up @@ -1613,6 +1611,8 @@ github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa02
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf h1:X4Hm7mZFAE+vJZ62mcXuH9BywmKiAr9B4V5LQLcTr70=
github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf/go.mod h1:pjcozWijkNPbEtX5SIQaxEW/h8VAVZYTLx+70bmB3LY=
github.com/kolo/xmlrpc v0.0.0-20200310150728-e0350524596b/go.mod h1:o03bZfuBwAXHetKXuInt4S7omeXUu62/A845kiycsSQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down
82 changes: 51 additions & 31 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/store"
Expand All @@ -23,6 +24,8 @@ var (
_registryAddressEnv = "MICRO_REGISTRY_ADDRESS"
_registryUsernameEnv = "MICRO_REGISTRY_AUTH_USERNAME"
_registryPasswordEnv = "MICRO_REGISTRY_AUTH_PASSWORD"

_serviceDelimiter = "/"
)

func init() {
Expand Down Expand Up @@ -80,76 +83,93 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
if s == nil {
return errors.New("wont store nil service")
}

unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Metadata["uuid"] = unique

b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name,
Key: s.Name + _serviceDelimiter + unique,
Value: b,
Expiry: n.expiry,
})
}

// Deregister removes a service from the registry
// Deregister removes a service from the registry.
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

return n.store.Delete(s.Name)
var unique string
if s.Metadata != nil {
unique = s.Metadata["uuid"]
}

return n.store.Delete(s.Name + _serviceDelimiter + unique)
}

// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(recs))
for _, rec := range recs {
var s registry.Service
if err := json.Unmarshal(rec.Value, &s); err != nil {
return nil, err
}
svcs = append(svcs, &s)
}
return svcs, nil
// avoid listing e.g. `webfinger` when requesting `web` by adding the delimiter to the service name
return n.listServices(store.ListPrefix(s + _serviceDelimiter))
}

// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
return n.listServices()
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return NewWatcher(n)
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
}

func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

keys, err := n.store.List()
keys, err := n.store.List(opts...)
if err != nil {
return nil, err
}

var svcs []*registry.Service
svcs := make([]*registry.Service, 0, len(keys))
for _, k := range keys {
s, err := n.GetService(k)
s, err := n.getService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s...)
svcs = append(svcs, s)

}
return svcs, nil
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("watcher not implemented")
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
func (n *storeregistry) getService(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
if len(recs) == 0 {
return nil, registry.ErrNotFound
}
var svc registry.Service
if err := json.Unmarshal(recs[0].Value, &svc); err != nil {
return nil, err
}
return &svc, nil
}

func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
Expand Down
74 changes: 74 additions & 0 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package natsjsregistry

import (
"errors"

"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)

// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
WatchAll(bucket string, opts ...nats.WatchOpt) (nats.KeyWatcher, error)
}

// Watcher is used to keep track of changes in the registry
type Watcher struct {
watch nats.KeyWatcher
updates <-chan nats.KeyValueEntry
reg *storeregistry
}

// NewWatcher returns a new watcher
func NewWatcher(s *storeregistry) (*Watcher, error) {
w, ok := s.store.(NatsWatcher)
if !ok {
return nil, errors.New("store does not implement watcher interface")
}

watcher, err := w.WatchAll("service-registry")
if err != nil {
return nil, err
}

return &Watcher{
watch: watcher,
updates: watcher.Updates(),
reg: s,
}, nil
}

// Next returns the next result. It is a blocking call
func (w *Watcher) Next() (*registry.Result, error) {
kve := <-w.updates
if kve == nil {
return nil, errors.New("watcher stopped")
}

service, err := w.reg.getService(kve.Key())
if err != nil {
return nil, err
}

var action string
switch kve.Operation() {
default:
action = "create"
case nats.KeyValuePut:
action = "create"
case nats.KeyValueDelete:
action = "delete"
case nats.KeyValuePurge:
action = "delete"
}

return &registry.Result{
Service: service,
Action: action,
}, nil
}

// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.watch.Stop()
}
16 changes: 15 additions & 1 deletion vendor/github.com/go-micro/plugins/v4/store/nats-js-kv/nats.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,7 @@ github.com/go-micro/plugins/v4/server/http
# github.com/go-micro/plugins/v4/store/nats-js v1.2.1-0.20231129143103-d72facc652f0
## explicit; go 1.21
github.com/go-micro/plugins/v4/store/nats-js
# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e
# github.com/go-micro/plugins/v4/store/nats-js-kv v0.0.0-20231226212146-94a49ba3e06e => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf
## explicit; go 1.21
github.com/go-micro/plugins/v4/store/nats-js-kv
# github.com/go-micro/plugins/v4/store/redis v1.2.1
Expand Down Expand Up @@ -2435,3 +2435,4 @@ stash.kopano.io/kgol/rndm
# github.com/studio-b12/gowebdav => github.com/aduffeck/gowebdav v0.0.0-20231215102054-212d4a4374f6
# github.com/egirna/icap-client => github.com/fschade/icap-client v0.0.0-20240123094924-5af178158eaf
# github.com/unrolled/secure => github.com/DeepDiver1975/secure v0.0.0-20240611112133-abc838fb797c
# github.com/go-micro/plugins/v4/store/nats-js-kv => github.com/kobergj/plugins/v4/store/nats-js-kv v0.0.0-20240716134540-6dfbf5819fbf