diff --git a/CHANGELOG.md b/CHANGELOG.md index f01d445..221bb45 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,8 @@ `tap` and `subscribe` commands * new option `--silent` for commands `tap` and `subscribe` which suppresses message output to stdout -* short `-o` otpion for the info command `--omit-empty` is no longer supported +* short `-o` option for the info command `--omit-empty` is no longer supported +* uniformly name test files `*_test.go` to improve external tool discoverbility ## v1.20 (2019-08-30) diff --git a/cmd/rabtap/cmd_exchange_integration_test.go b/cmd/rabtap/cmd_exchange_test.go similarity index 100% rename from cmd/rabtap/cmd_exchange_integration_test.go rename to cmd/rabtap/cmd_exchange_test.go diff --git a/cmd/rabtap/cmd_publish_integration_test.go b/cmd/rabtap/cmd_publish_test.go similarity index 100% rename from cmd/rabtap/cmd_publish_integration_test.go rename to cmd/rabtap/cmd_publish_test.go diff --git a/cmd/rabtap/cmd_queue_integration_test.go b/cmd/rabtap/cmd_queue_test.go similarity index 100% rename from cmd/rabtap/cmd_queue_integration_test.go rename to cmd/rabtap/cmd_queue_test.go diff --git a/cmd/rabtap/cmd_subscribe_integration_test.go b/cmd/rabtap/cmd_subscribe_test.go similarity index 100% rename from cmd/rabtap/cmd_subscribe_integration_test.go rename to cmd/rabtap/cmd_subscribe_test.go diff --git a/cmd/rabtap/cmd_tap_integration_test.go b/cmd/rabtap/cmd_tap_test.go similarity index 100% rename from cmd/rabtap/cmd_tap_integration_test.go rename to cmd/rabtap/cmd_tap_test.go diff --git a/pkg/amqp_connector_integration_test.go b/pkg/amqp_connector_test.go similarity index 100% rename from pkg/amqp_connector_integration_test.go rename to pkg/amqp_connector_test.go diff --git a/pkg/exchange_integration_test.go b/pkg/exchange_test.go similarity index 100% rename from pkg/exchange_integration_test.go rename to pkg/exchange_test.go diff --git a/pkg/publish_integration_test.go b/pkg/publish_test.go similarity index 100% rename from pkg/publish_integration_test.go rename to pkg/publish_test.go diff --git a/pkg/queue_integration_test.go b/pkg/queue_test.go similarity index 100% rename from pkg/queue_integration_test.go rename to pkg/queue_test.go diff --git a/pkg/session_integration_test.go b/pkg/session_test.go similarity index 100% rename from pkg/session_integration_test.go rename to pkg/session_test.go diff --git a/pkg/subscribe_integration_test.go b/pkg/subscribe_test.go similarity index 100% rename from pkg/subscribe_integration_test.go rename to pkg/subscribe_test.go diff --git a/pkg/tap_integration_test.go b/pkg/tap_integration_test.go deleted file mode 100644 index 11ae7da..0000000 --- a/pkg/tap_integration_test.go +++ /dev/null @@ -1,210 +0,0 @@ -// Copyright (C) 2017 Jan Delgado -// +build integration - -package rabtap - -// integration test functionality. assumes running rabbitmq broker on address -// defined by AMQP_URL and RABBIT_API_URL environment variables. -// (to start a local rabbitmq instance: -// $ sudo docker run --rm -ti -p5672:5672 rabbitmq:3-management) - -import ( - "context" - "crypto/tls" - "log" - "os" - "testing" - "time" - - "github.com/jandelgado/rabtap/pkg/testcommon" - "github.com/streadway/amqp" - "github.com/stretchr/testify/assert" -) - -const ( - MessagesPerTest = 5 - ResultTimeout = time.Second * 5 - TapReadyDelay = time.Millisecond * 500 -) - -func verifyMessagesOnTap(t *testing.T, consumer string, numExpected int, - tapExchangeName, tapQueueName string, - success chan<- int) *AmqpTap { - - tap := NewAmqpTap(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) - resultChannel := make(TapChannel) - // TODO cancel and return cancel func - ctx, cancel := context.WithCancel(context.Background()) - go tap.EstablishTap( - ctx, - []ExchangeConfiguration{ - {tapExchangeName, tapQueueName}}, - resultChannel) - - func() { - numReceived := 0 - - // sample messages for 3 seconds and return number of returned messages - // through the success channel - for { - select { - case <-time.After(time.Second * 3): - success <- numReceived - return - case message := <-resultChannel: - if message.AmqpMessage != nil { - if string(message.AmqpMessage.Body) == "Hello" { - numReceived++ - } - } - } - } - }() - cancel() - return tap -} - -func requireIntFromChan(t *testing.T, c <-chan int, expected int) { - select { - case val := <-c: - assert.Equal(t, expected, val) - return - case <-time.After(ResultTimeout): - assert.Fail(t, "test result not received in expected time frame") - } -} - -func TestIntegrationHeadersExchange(t *testing.T) { - - // establish sending exchange - conn, ch := testcommon.IntegrationTestConnection(t, "headers-exchange", "headers", 2, true) - defer conn.Close() - - finishChan := make(chan int) - - // no binding key is needed for the headers exchange - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "headers-exchange", "", finishChan) - time.Sleep(TapReadyDelay) - - // inject messages into exchange. Each message should become visible - // in the tap-exchange defined above. We use a headers exchange so we - // must provide a amqp.Table struct with the messages headers, on which - // routing is based. See integrationTestConnection() on how the routing - // header is constructed. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "headers-exchange", "", amqp.Table{"header1": "test0"}) - - requireIntFromChan(t, finishChan, MessagesPerTest) - - // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) -} - -func TestIntegrationDirectExchange(t *testing.T) { - - // establish sending exchange - conn, ch := testcommon.IntegrationTestConnection(t, "direct-exchange", "direct", 2, false) - defer conn.Close() - - finishChan := make(chan int) - - // connect a test-tap and check if we received the test message - MessagesPerTest := MessagesPerTest - - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "direct-exchange", "queue-0", finishChan) - - time.Sleep(TapReadyDelay) - - // inject messages into exchange. Each message should become visible - // in the tap-exchange defined above. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "direct-exchange", "queue-0", nil) - - requireIntFromChan(t, finishChan, MessagesPerTest) - - // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) -} - -// TestIntegrationTopicExchangeTapSingleQueue tests tapping to a topic -// exchange with a routing key so that only messages sent to one topic are -// tapped. -func TestIntegrationTopicExchangeTapSingleQueue(t *testing.T) { - - // establish sending exchange - conn, ch := testcommon.IntegrationTestConnection(t, "topic-exchange", "topic", 2, false) - defer conn.Close() - - finishChan := make(chan int) - - // connect a test-tap and check if we received the test message - MessagesPerTest := MessagesPerTest - - // tap only messages routed to queue-0 - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "topic-exchange", "queue-0", finishChan) - - time.Sleep(TapReadyDelay) - - // inject messages into exchange. Each message should become visible - // in the tap-exchange defined above. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil) - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil) - - requireIntFromChan(t, finishChan, MessagesPerTest) - - // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) - - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", MessagesPerTest, "queue-1", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) -} - -// TestIntegrationTopicExchangeTapWildcard tests tapping to an exechange -// of type topic. The tap-exchange s bound with the binding-key '#'. -func TestIntegrationTopicExchangeTapWildcard(t *testing.T) { - - // establish sending exchange - conn, ch := testcommon.IntegrationTestConnection(t, "topic-exchange", "topic", 2, false) - defer conn.Close() - - finishChan := make(chan int) - - // connect a test-tap and check if we received the test message - MessagesPerTest := MessagesPerTest - - // tap all messages on the exchange - go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest*2, "topic-exchange", "#", finishChan) - - time.Sleep(TapReadyDelay) - - // inject messages into exchange. Each message should become visible - // in the tap-exchange defined above. - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil) - testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil) - - requireIntFromChan(t, finishChan, MessagesPerTest*2) - - // the original messages should also be delivered. - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) - - testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", MessagesPerTest, "queue-1", finishChan) - requireIntFromChan(t, finishChan, MessagesPerTest) -} - -// TestIntegrationInvalidExchange tries to tap to a non existing exhange, we -// expect an error returned. -func TestIntegrationInvalidExchange(t *testing.T) { - - tapMessages := make(TapChannel) - tap := NewAmqpTap(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) - ctx := context.Background() - err := tap.EstablishTap( - ctx, - []ExchangeConfiguration{ - {"nonexisting-exchange", "test"}}, - tapMessages) - - assert.NotNil(t, err) -} diff --git a/pkg/tap_test.go b/pkg/tap_test.go index b6d3b67..48c97f8 100644 --- a/pkg/tap_test.go +++ b/pkg/tap_test.go @@ -1,13 +1,32 @@ // Copyright (C) 2017 Jan Delgado +// +build integration package rabtap +// integration test functionality. assumes running rabbitmq broker on address +// defined by AMQP_URL and RABBIT_API_URL environment variables. +// (to start a local rabbitmq instance: +// $ sudo docker run --rm -ti -p5672:5672 rabbitmq:3-management) + import ( + "context" + "crypto/tls" + "log" + "os" "testing" + "time" + "github.com/jandelgado/rabtap/pkg/testcommon" + "github.com/streadway/amqp" "github.com/stretchr/testify/assert" ) +const ( + MessagesPerTest = 5 + ResultTimeout = time.Second * 5 + TapReadyDelay = time.Millisecond * 500 +) + func TestGetTapQueueNameForExchange(t *testing.T) { assert.Equal(t, "__tap-queue-for-exchange-1234", @@ -19,3 +38,184 @@ func TestGetTapEchangeNameForExchange(t *testing.T) { assert.Equal(t, "__tap-exchange-for-exchange-1234", getTapExchangeNameForExchange("exchange", "1234")) } +func verifyMessagesOnTap(t *testing.T, consumer string, numExpected int, + tapExchangeName, tapQueueName string, + success chan<- int) *AmqpTap { + + tap := NewAmqpTap(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) + resultChannel := make(TapChannel) + // TODO cancel and return cancel func + ctx, cancel := context.WithCancel(context.Background()) + go tap.EstablishTap( + ctx, + []ExchangeConfiguration{ + {tapExchangeName, tapQueueName}}, + resultChannel) + + func() { + numReceived := 0 + + // sample messages for 3 seconds and return number of returned messages + // through the success channel + for { + select { + case <-time.After(time.Second * 3): + success <- numReceived + return + case message := <-resultChannel: + if message.AmqpMessage != nil { + if string(message.AmqpMessage.Body) == "Hello" { + numReceived++ + } + } + } + } + }() + cancel() + return tap +} + +func requireIntFromChan(t *testing.T, c <-chan int, expected int) { + select { + case val := <-c: + assert.Equal(t, expected, val) + return + case <-time.After(ResultTimeout): + assert.Fail(t, "test result not received in expected time frame") + } +} + +func TestIntegrationHeadersExchange(t *testing.T) { + + // establish sending exchange + conn, ch := testcommon.IntegrationTestConnection(t, "headers-exchange", "headers", 2, true) + defer conn.Close() + + finishChan := make(chan int) + + // no binding key is needed for the headers exchange + go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "headers-exchange", "", finishChan) + time.Sleep(TapReadyDelay) + + // inject messages into exchange. Each message should become visible + // in the tap-exchange defined above. We use a headers exchange so we + // must provide a amqp.Table struct with the messages headers, on which + // routing is based. See integrationTestConnection() on how the routing + // header is constructed. + testcommon.PublishTestMessages(t, ch, MessagesPerTest, "headers-exchange", "", amqp.Table{"header1": "test0"}) + + requireIntFromChan(t, finishChan, MessagesPerTest) + + // the original messages should also be delivered. + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, MessagesPerTest) +} + +func TestIntegrationDirectExchange(t *testing.T) { + + // establish sending exchange + conn, ch := testcommon.IntegrationTestConnection(t, "direct-exchange", "direct", 2, false) + defer conn.Close() + + finishChan := make(chan int) + + // connect a test-tap and check if we received the test message + MessagesPerTest := MessagesPerTest + + go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "direct-exchange", "queue-0", finishChan) + + time.Sleep(TapReadyDelay) + + // inject messages into exchange. Each message should become visible + // in the tap-exchange defined above. + testcommon.PublishTestMessages(t, ch, MessagesPerTest, "direct-exchange", "queue-0", nil) + + requireIntFromChan(t, finishChan, MessagesPerTest) + + // the original messages should also be delivered. + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, MessagesPerTest) +} + +// TestIntegrationTopicExchangeTapSingleQueue tests tapping to a topic +// exchange with a routing key so that only messages sent to one topic are +// tapped. +func TestIntegrationTopicExchangeTapSingleQueue(t *testing.T) { + + // establish sending exchange + conn, ch := testcommon.IntegrationTestConnection(t, "topic-exchange", "topic", 2, false) + defer conn.Close() + + finishChan := make(chan int) + + // connect a test-tap and check if we received the test message + MessagesPerTest := MessagesPerTest + + // tap only messages routed to queue-0 + go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest, "topic-exchange", "queue-0", finishChan) + + time.Sleep(TapReadyDelay) + + // inject messages into exchange. Each message should become visible + // in the tap-exchange defined above. + testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil) + testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil) + + requireIntFromChan(t, finishChan, MessagesPerTest) + + // the original messages should also be delivered. + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, MessagesPerTest) + + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", MessagesPerTest, "queue-1", finishChan) + requireIntFromChan(t, finishChan, MessagesPerTest) +} + +// TestIntegrationTopicExchangeTapWildcard tests tapping to an exechange +// of type topic. The tap-exchange s bound with the binding-key '#'. +func TestIntegrationTopicExchangeTapWildcard(t *testing.T) { + + // establish sending exchange + conn, ch := testcommon.IntegrationTestConnection(t, "topic-exchange", "topic", 2, false) + defer conn.Close() + + finishChan := make(chan int) + + // connect a test-tap and check if we received the test message + MessagesPerTest := MessagesPerTest + + // tap all messages on the exchange + go verifyMessagesOnTap(t, "tap-consumer1", MessagesPerTest*2, "topic-exchange", "#", finishChan) + + time.Sleep(TapReadyDelay) + + // inject messages into exchange. Each message should become visible + // in the tap-exchange defined above. + testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-0", nil) + testcommon.PublishTestMessages(t, ch, MessagesPerTest, "topic-exchange", "queue-1", nil) + + requireIntFromChan(t, finishChan, MessagesPerTest*2) + + // the original messages should also be delivered. + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer2", MessagesPerTest, "queue-0", finishChan) + requireIntFromChan(t, finishChan, MessagesPerTest) + + testcommon.VerifyTestMessageOnQueue(t, ch, "consumer3", MessagesPerTest, "queue-1", finishChan) + requireIntFromChan(t, finishChan, MessagesPerTest) +} + +// TestIntegrationInvalidExchange tries to tap to a non existing exhange, we +// expect an error returned. +func TestIntegrationInvalidExchange(t *testing.T) { + + tapMessages := make(TapChannel) + tap := NewAmqpTap(testcommon.IntegrationURIFromEnv(), &tls.Config{}, log.New(os.Stderr, "", log.LstdFlags)) + ctx := context.Background() + err := tap.EstablishTap( + ctx, + []ExchangeConfiguration{ + {"nonexisting-exchange", "test"}}, + tapMessages) + + assert.NotNil(t, err) +}