Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Stackweb #15

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
37 changes: 37 additions & 0 deletions broker/nats/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package nats

import (
"context"

"github.com/stack-labs/stack-rpc/broker"
)

// setSubscribeOption returns a function to setup a context with given value
func setSubscribeOption(k, v interface{}) broker.SubscribeOption {
return func(o *broker.SubscribeOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

// setBrokerOption returns a function to setup a context with given value
func setBrokerOption(k, v interface{}) broker.Option {
return func(o *broker.Options) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}

// setPublishOption returns a function to setup a context with given value
func setPublishOption(k, v interface{}) broker.PublishOption {
return func(o *broker.PublishOptions) {
if o.Context == nil {
o.Context = context.Background()
}
o.Context = context.WithValue(o.Context, k, v)
}
}
11 changes: 11 additions & 0 deletions broker/nats/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/stack-labs/stack-rpc-plugins/broker/nats

go 1.14

replace (
github.com/stack-labs/stack-rpc v1.0.0 => ../../../stack-rpc
)

require (
github.com/stack-labs/stack-rpc v1.0.0
)
256 changes: 256 additions & 0 deletions broker/nats/nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
// Package nats provides a NATS broker
package nats

import (
"context"
"errors"
"strings"
"sync"

nats "github.com/nats-io/nats.go"
"github.com/stack-labs/stack-rpc/broker"
"github.com/stack-labs/stack-rpc/codec/json"
)

type natsBroker struct {
sync.Once
sync.RWMutex
addrs []string
conn *nats.Conn
opts broker.Options
nopts nats.Options
drain bool
closeCh chan (error)
}

type subscriber struct {
s *nats.Subscription
opts broker.SubscribeOptions
}

type publication struct {
t string
m *broker.Message
}

func (p *publication) Topic() string {
return p.t
}

func (p *publication) Message() *broker.Message {
return p.m
}

func (p *publication) Ack() error {
// nats does not support acking
return nil
}

func (s *subscriber) Options() broker.SubscribeOptions {
return s.opts
}

func (s *subscriber) Topic() string {
return s.s.Subject
}

func (s *subscriber) Unsubscribe() error {
return s.s.Unsubscribe()
}

func (n *natsBroker) Address() string {
if n.conn != nil && n.conn.IsConnected() {
return n.conn.ConnectedUrl()
}
if len(n.addrs) > 0 {
return n.addrs[0]
}

return ""
}

func setAddrs(addrs []string) []string {
//nolint:prealloc
var cAddrs []string
for _, addr := range addrs {
if len(addr) == 0 {
continue
}
if !strings.HasPrefix(addr, "nats://") {
addr = "nats://" + addr
}
cAddrs = append(cAddrs, addr)
}
if len(cAddrs) == 0 {
cAddrs = []string{nats.DefaultURL}
}
return cAddrs
}

func (n *natsBroker) Connect() error {
n.Lock()
defer n.Unlock()

status := nats.CLOSED
if n.conn != nil {
status = n.conn.Status()
}

switch status {
case nats.CONNECTED, nats.RECONNECTING, nats.CONNECTING:
return nil
default: // DISCONNECTED or CLOSED or DRAINING
opts := n.nopts
opts.Servers = n.addrs
opts.Secure = n.opts.Secure
opts.TLSConfig = n.opts.TLSConfig

// secure might not be set
if n.opts.TLSConfig != nil {
opts.Secure = true
}

c, err := opts.Connect()
if err != nil {
return err
}
n.conn = c
return nil
}
}

func (n *natsBroker) Disconnect() error {
n.RLock()
defer n.RUnlock()
if n.drain {
n.conn.Drain()
return <-n.closeCh
}
n.conn.Close()
return nil
}

func (n *natsBroker) Init(opts ...broker.Option) error {
n.setOption(opts...)
return nil
}

func (n *natsBroker) Options() broker.Options {
return n.opts
}

func (n *natsBroker) Publish(topic string, msg *broker.Message, opts ...broker.PublishOption) error {
b, err := n.opts.Codec.Marshal(msg)
if err != nil {
return err
}
n.RLock()
defer n.RUnlock()
return n.conn.Publish(topic, b)
}

func (n *natsBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
if n.conn == nil {
return nil, errors.New("not connected")
}

opt := broker.SubscribeOptions{
AutoAck: true,
Context: context.Background(),
}

for _, o := range opts {
o(&opt)
}

fn := func(msg *nats.Msg) {
var m broker.Message
if err := n.opts.Codec.Unmarshal(msg.Data, &m); err != nil {
return
}
handler(&publication{m: &m, t: msg.Subject})
}

var sub *nats.Subscription
var err error

n.RLock()
if len(opt.Queue) > 0 {
sub, err = n.conn.QueueSubscribe(topic, opt.Queue, fn)
} else {
sub, err = n.conn.Subscribe(topic, fn)
}
n.RUnlock()
if err != nil {
return nil, err
}
return &subscriber{s: sub, opts: opt}, nil
}

func (n *natsBroker) String() string {
return "nats"
}

func NewBroker(opts ...broker.Option) broker.Broker {
options := broker.Options{
// Default codec
Codec: json.Marshaler{},
Context: context.Background(),
}

n := &natsBroker{
opts: options,
}
n.setOption(opts...)

return n
}

func (n *natsBroker) setOption(opts ...broker.Option) {
for _, o := range opts {
o(&n.opts)
}

n.Once.Do(func() {
n.nopts = nats.GetDefaultOptions()
})

if nopts, ok := n.opts.Context.Value(optionsKey{}).(nats.Options); ok {
n.nopts = nopts
}

// broker.Options have higher priority than nats.Options
// only if Addrs, Secure or TLSConfig were not set through a broker.Option
// we read them from nats.Option
if len(n.opts.Addrs) == 0 {
n.opts.Addrs = n.nopts.Servers
}

if !n.opts.Secure {
n.opts.Secure = n.nopts.Secure
}

if n.opts.TLSConfig == nil {
n.opts.TLSConfig = n.nopts.TLSConfig
}
n.addrs = setAddrs(n.opts.Addrs)

if n.opts.Context.Value(drainConnectionKey{}) != nil {
n.drain = true
n.closeCh = make(chan error)
n.nopts.ClosedCB = n.onClose
n.nopts.AsyncErrorCB = n.onAsyncError
}
}

func (n *natsBroker) onClose(conn *nats.Conn) {
n.closeCh <- nil
}

func (n *natsBroker) onAsyncError(conn *nats.Conn, sub *nats.Subscription, err error) {
// There are kinds of different async error nats might callback, but we are interested
// in ErrDrainTimeout only here.
if err == nats.ErrDrainTimeout {
n.closeCh <- err
}
}
Loading