Skip to content

Commit

Permalink
Non exclusive consumer in subscribe mode (#7)
Browse files Browse the repository at this point in the history
use non-exclusive mode with sub command
  • Loading branch information
jandelgado authored Feb 28, 2019
1 parent 650070c commit 2919196
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 6 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@

# Changelog for rabtap

## v1.14 (2019-02-28)

* change: in subscribe mode, the consumer will use non-exclusive mode,
allowing multiple consumers on the same queue.

## v1.13 (2019-02-26)

* updated go version to 1.12, dropping `dep` module manager
Expand Down
2 changes: 1 addition & 1 deletion cmd/rabtap/cmd_subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func cmdSubscribe(cmd CmdSubscribeArg) error {
// this channel is used to decouple message receiving threads
// with the main thread, which does the actual message processing
messageChannel := make(rabtap.TapChannel)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log)
subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, false, cmd.tlsConfig, log)
defer subscriber.Close()
go subscriber.EstablishSubscription(cmd.queue, messageChannel)
return messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel)
Expand Down
8 changes: 5 additions & 3 deletions pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ import (
type AmqpSubscriber struct {
connection *AmqpConnector
logger logrus.StdLogger
exclusive bool
}

// NewAmqpSubscriber returns a new AmqpSubscriber object associated with the
// RabbitMQ broker denoted by the uri parameter.
func NewAmqpSubscriber(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber {
func NewAmqpSubscriber(uri string, exclusive bool, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber {
return &AmqpSubscriber{
connection: NewAmqpConnector(uri, tlsConfig, logger),
exclusive: exclusive,
logger: logger}
}

Expand Down Expand Up @@ -110,8 +112,8 @@ func (s *AmqpSubscriber) consumeMessages(conn *amqp.Connection,
msgs, err := ch.Consume(
queueName,
"__rabtap-consumer-"+uuid.NewV4().String()[:8], // TODO param
true, // auto-ack
true, // exclusive
true, // auto-ack
s.exclusive,
false, // no-local - unsupported
false, // wait
nil, // args
Expand Down
2 changes: 1 addition & 1 deletion pkg/subscribe_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSubscribe(t *testing.T) {

finishChan := make(chan int)

subscriber := NewAmqpSubscriber(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags))
subscriber := NewAmqpSubscriber(testcommon.IntegrationURIFromEnv(), false, &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags))
defer subscriber.Close()
resultChannel := make(TapChannel)
go subscriber.EstablishSubscription(queueName, resultChannel)
Expand Down
2 changes: 1 addition & 1 deletion pkg/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type AmqpTap struct {
// broker denoted by the uri parameter.
func NewAmqpTap(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpTap {
return &AmqpTap{
AmqpSubscriber: NewAmqpSubscriber(uri, tlsConfig, logger)}
AmqpSubscriber: NewAmqpSubscriber(uri, true /* exclusive */, tlsConfig, logger)}
}

func getTapExchangeNameForExchange(exchange, postfix string) string {
Expand Down

0 comments on commit 2919196

Please sign in to comment.