diff --git a/CHANGELOG.md b/CHANGELOG.md index f1e773f..21484cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ # Changelog for rabtap +## v1.14 (2019-02-28) + +* change: in subscribe mode, the consumer will use non-exclusive mode, + allowing multiple consumers on the same queue. + ## v1.13 (2019-02-26) * updated go version to 1.12, dropping `dep` module manager diff --git a/cmd/rabtap/cmd_subscribe.go b/cmd/rabtap/cmd_subscribe.go index 57cdba7..11adeae 100644 --- a/cmd/rabtap/cmd_subscribe.go +++ b/cmd/rabtap/cmd_subscribe.go @@ -26,7 +26,7 @@ func cmdSubscribe(cmd CmdSubscribeArg) error { // this channel is used to decouple message receiving threads // with the main thread, which does the actual message processing messageChannel := make(rabtap.TapChannel) - subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, cmd.tlsConfig, log) + subscriber := rabtap.NewAmqpSubscriber(cmd.amqpURI, false, cmd.tlsConfig, log) defer subscriber.Close() go subscriber.EstablishSubscription(cmd.queue, messageChannel) return messageReceiveLoop(messageChannel, cmd.messageReceiveFunc, cmd.signalChannel) diff --git a/pkg/subscribe.go b/pkg/subscribe.go index 7bd0d57..1d1555b 100644 --- a/pkg/subscribe.go +++ b/pkg/subscribe.go @@ -14,13 +14,15 @@ import ( type AmqpSubscriber struct { connection *AmqpConnector logger logrus.StdLogger + exclusive bool } // NewAmqpSubscriber returns a new AmqpSubscriber object associated with the // RabbitMQ broker denoted by the uri parameter. -func NewAmqpSubscriber(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber { +func NewAmqpSubscriber(uri string, exclusive bool, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpSubscriber { return &AmqpSubscriber{ connection: NewAmqpConnector(uri, tlsConfig, logger), + exclusive: exclusive, logger: logger} } @@ -110,8 +112,8 @@ func (s *AmqpSubscriber) consumeMessages(conn *amqp.Connection, msgs, err := ch.Consume( queueName, "__rabtap-consumer-"+uuid.NewV4().String()[:8], // TODO param - true, // auto-ack - true, // exclusive + true, // auto-ack + s.exclusive, false, // no-local - unsupported false, // wait nil, // args diff --git a/pkg/subscribe_integration_test.go b/pkg/subscribe_integration_test.go index e6de851..34994df 100644 --- a/pkg/subscribe_integration_test.go +++ b/pkg/subscribe_integration_test.go @@ -30,7 +30,7 @@ func TestSubscribe(t *testing.T) { finishChan := make(chan int) - subscriber := NewAmqpSubscriber(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) + subscriber := NewAmqpSubscriber(testcommon.IntegrationURIFromEnv(), false, &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) defer subscriber.Close() resultChannel := make(TapChannel) go subscriber.EstablishSubscription(queueName, resultChannel) diff --git a/pkg/tap.go b/pkg/tap.go index 40e14e3..e1dc5fd 100644 --- a/pkg/tap.go +++ b/pkg/tap.go @@ -25,7 +25,7 @@ type AmqpTap struct { // broker denoted by the uri parameter. func NewAmqpTap(uri string, tlsConfig *tls.Config, logger logrus.StdLogger) *AmqpTap { return &AmqpTap{ - AmqpSubscriber: NewAmqpSubscriber(uri, tlsConfig, logger)} + AmqpSubscriber: NewAmqpSubscriber(uri, true /* exclusive */, tlsConfig, logger)} } func getTapExchangeNameForExchange(exchange, postfix string) string {