Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve bus for large numbers of similar subscriptions #27

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 108 additions & 103 deletions client/bus/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,59 @@ import (
"sync"

"github.com/CyCoreSystems/ari/v5"
"github.com/CyCoreSystems/ari/v5/stdbus"
"github.com/inconshreveable/log15"
"github.com/pkg/errors"

"github.com/nats-io/nats.go"
)

// EventChanBufferLength is the number of unhandled events which can be queued
// to the event channel buffer before further events are lost.
var EventChanBufferLength = 10
// busWrapper binds a NATS subject to an ari.Bus, passing any received NATS messages to that bus
type busWrapper struct {
subject string

log log15.Logger

sub *nats.Subscription

bus ari.Bus
}

func newBusWrapper(subject string, nc *nats.EncodedConn, log log15.Logger) (*busWrapper, error) {
var err error

w := &busWrapper{
subject: subject,
log: log,
bus: stdbus.New(),
}

w.sub, err = nc.Subscribe(subject, func(m *nats.Msg) {
w.receive(m)
})
if err != nil {
return nil, errors.Wrapf(err, "failed to subscribe to NATS subject %s", subject)
}

return w, nil
}

func (w *busWrapper) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
if err != nil {
w.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

w.bus.Send(e)
}

func (w *busWrapper) Close() {
if err := w.sub.Unsubscribe(); err != nil {
w.log.Error("failed to unsubscribe when closing NATS subscription:", err)
}
w.bus.Close()
}

// Bus provides an ari.Bus interface to NATS
type Bus struct {
Expand All @@ -21,14 +66,53 @@ type Bus struct {
log log15.Logger

nc *nats.EncodedConn

subjectBuses map[string]*busWrapper

mu sync.RWMutex
}

// New returns a new Bus
func New(prefix string, nc *nats.EncodedConn, log log15.Logger) *Bus {
return &Bus{
prefix: prefix,
log: log,
nc: nc,
prefix: prefix,
log: log,
nc: nc,
subjectBuses: make(map[string]*busWrapper),
}
}

type subBus struct {
bus ari.Bus
subs []ari.Subscription
}

func (b *subBus) Close() {
for _, s := range b.subs {
s.Cancel()
}
b.subs = nil

// NOTE: we are NOT closing the parent bus here and now, since it could be used by any number of other clients
// TODO: Ultimately, we will need to derive a way to check to see if the parent bus is then unused, in which case, the NATS subscription(s) should then be closed.
}

func (b *subBus) Send(e ari.Event) {
b.bus.Send(e)
}

func (b *subBus) Subscribe(key *ari.Key, eTypes ...string) ari.Subscription {
sub := b.bus.Subscribe(key, eTypes...)

b.subs = append(b.subs, sub)

return sub
}

// SubBus creates and returns a new ariBus which is subtended from this one
func (b *Bus) SubBus() ari.Bus {
return &subBus{
bus: b,
}
}

Expand All @@ -53,26 +137,15 @@ func (b *Bus) subjectFromKey(key *ari.Key) string {
return subj + key.Node
}

// Subscription represents an ari.Subscription over NATS
type Subscription struct {
key *ari.Key

log log15.Logger

subscription *nats.Subscription

eventChan chan ari.Event

events []string

closed bool

mu sync.RWMutex
}

// Close implements ari.Bus
func (b *Bus) Close() {
// No-op
b.mu.Lock()

for _, w := range b.subjectBuses {
w.Close()
}

b.mu.Unlock()
}

// Send implements ari.Bus
Expand All @@ -84,88 +157,20 @@ func (b *Bus) Send(e ari.Event) {
func (b *Bus) Subscribe(key *ari.Key, n ...string) ari.Subscription {
var err error

s := &Subscription{
key: key,
log: b.log,
eventChan: make(chan ari.Event, EventChanBufferLength),
events: n,
}

s.subscription, err = b.nc.Subscribe(b.subjectFromKey(key), func(m *nats.Msg) {
s.receive(m)
})
if err != nil {
b.log.Error("failed to subscribe to NATS", "error", err)
return nil
}
return s
}

// Events returns the channel on which events from this subscription will be sent
func (s *Subscription) Events() <-chan ari.Event {
return s.eventChan
}

// Cancel destroys the subscription
func (s *Subscription) Cancel() {
if s == nil {
return
}
subject := b.subjectFromKey(key)

if s.subscription != nil {
err := s.subscription.Unsubscribe()
b.mu.Lock()
w, ok := b.subjectBuses[subject]
if !ok {
w, err = newBusWrapper(subject, b.nc, b.log)
if err != nil {
s.log.Error("failed unsubscribe from NATS", "error", err)
}
}

s.mu.Lock()
if !s.closed {
s.closed = true
close(s.eventChan)
}
s.mu.Unlock()
}

func (s *Subscription) receive(o *nats.Msg) {
e, err := ari.DecodeEvent(o.Data)
if err != nil {
s.log.Error("failed to convert received message to ari.Event", "error", err)
return
}

if s.matchEvent(e) {
s.mu.RLock()
if !s.closed {
s.eventChan <- e
b.mu.Unlock()
b.log.Error("failed to create bus wrapper", "key", key, "error", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should unlock here before return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed; thanks

return nil
}
s.mu.RUnlock()
b.subjectBuses[subject] = w
}
}
b.mu.Unlock()

func (s *Subscription) matchEvent(o ari.Event) bool {
// First, filter by type
var match bool
for _, kind := range s.events {
if kind == o.GetType() || kind == ari.Events.All {
match = true
break
}
}
if !match {
return false
}

// If we don't have a resource ID, we match everything
// Next, match the entity
if s.key == nil || s.key.ID == "" {
return true
}

for _, k := range o.Keys() {
if s.key.Match(k) {
return true
}
}
return false
return w.bus.Subscribe(key, n...)
}
52 changes: 0 additions & 52 deletions client/bus/bus_test.go

This file was deleted.

14 changes: 10 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var ErrNil = errors.New("Nil")
// family of derived clients. It manages stateful elements such as the bus,
// the NATS connection, and the cluster membership
type core struct {
// bus is the core ari-proxy bus, which handles NATS subscription binding
bus *bus.Bus

// cluster describes the cluster of ARI proxies
cluster *cluster.Cluster

Expand Down Expand Up @@ -232,8 +235,11 @@ func New(ctx context.Context, opts ...OptionFunc) (*Client, error) {
return nil, errors.Wrap(err, "failed to start core")
}

// Create the bus
c.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)
// Create the core bus
c.core.bus = bus.New(c.core.prefix, c.core.nc, c.core.log)

// Extract a SubBus from that core bus (NOTE: must come after core is started so that NATS connection exists)
c.bus = c.core.bus.SubBus()

// Call Close whenever the context is closed
go func() {
Expand Down Expand Up @@ -262,7 +268,7 @@ func (c *Client) New(ctx context.Context) *Client {
appName: c.appName,
cancel: cancel,
core: c.core,
bus: bus.New(c.core.prefix, c.core.nc, c.core.log),
bus: c.core.bus.SubBus(),
}
}

Expand Down Expand Up @@ -320,7 +326,7 @@ func WithURI(uri string) OptionFunc {
// WithNATS binds an existing NATS connection
func WithNATS(nc *nats.EncodedConn) OptionFunc {
return func(c *Client) {
c.nc = nc
c.core.nc = nc
}
}

Expand Down