From 1fdeb0de2c78d3406f5ec155ddd29b618bb3daa1 Mon Sep 17 00:00:00 2001 From: jandelgado Date: Tue, 28 Jan 2020 19:25:18 +0100 Subject: [PATCH] replay messages as recorded (#39) --- CHANGELOG.md | 8 + Makefile | 9 +- README.md | 102 ++++++--- cmd/rabtap/cmd_exchange_test.go | 46 +++- cmd/rabtap/cmd_publish.go | 112 +++++----- cmd/rabtap/cmd_publish_test.go | 225 ++++++++++++++++++-- cmd/rabtap/cmd_queue_test.go | 31 +-- cmd/rabtap/cmd_replay_test.go | 3 + cmd/rabtap/cmd_subscribe.go | 3 - cmd/rabtap/cmd_subscribe_test.go | 75 +++++-- cmd/rabtap/cmd_tap.go | 4 - cmd/rabtap/cmd_tap_test.go | 38 ++++ cmd/rabtap/command_line.go | 99 ++++++--- cmd/rabtap/command_line_test.go | 75 +++++-- cmd/rabtap/main.go | 57 ++++- cmd/rabtap/main_test.go | 5 + cmd/rabtap/message_reader_dir.go | 137 ++++++++++++ cmd/rabtap/message_reader_dir_test.go | 283 +++++++++++++++++++++++++ cmd/rabtap/message_reader_file.go | 71 +++++++ cmd/rabtap/message_reader_file_test.go | 137 ++++++++++++ cmd/rabtap/message_writer.go | 2 +- go.mod | 4 +- go.sum | 13 ++ pkg/amqp_connector.go | 7 +- pkg/amqp_message_loop.go | 2 +- pkg/amqp_message_loop_test.go | 3 +- pkg/session.go | 8 +- pkg/session_test.go | 48 ++++- 28 files changed, 1370 insertions(+), 237 deletions(-) create mode 100644 cmd/rabtap/cmd_replay_test.go create mode 100644 cmd/rabtap/message_reader_dir.go create mode 100644 cmd/rabtap/message_reader_dir_test.go create mode 100644 cmd/rabtap/message_reader_file.go create mode 100644 cmd/rabtap/message_reader_file_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 221bb45..94c3eb9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,14 @@ # Changelog for rabtap +## v1.22 (2020-01-28) + +* The `pub` command now allows ialso to replay messages from a direcory previously + recorded. The pub command also honors the recorded timestamps and delays the + messages during replay. The signature of of the `pub` command was changed + (see README.md). Note that the exchange is now optional and will be taken + from the message metadata that is published. + ## v1.21 (2019-12-14) * new option: `--format FORMAT` which controls output format in `tap`, diff --git a/Makefile b/Makefile index 547dbc5..9afe144 100644 --- a/Makefile +++ b/Makefile @@ -15,15 +15,15 @@ build-all: build build-mac build-win build-mac: cd cmd/rabtap && GO111MODULE=on CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags \ - "-X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_DARWIN64) + "-s -w -X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_DARWIN64) build-linux: cd cmd/rabtap && GO111MODULE=on CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags \ - "-X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_LINUX64) + "-s -w -X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_LINUX64) build-win: cd cmd/rabtap && GO111MODULE=on CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags \ - "-X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_WIN64) + "-s -w -X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_WIN64) tags: $(SOURCE) @gotags -f tags $(SOURCE) @@ -58,8 +58,7 @@ toxiproxy-cmd: # run rabbitmq server for integration test using docker container. run-broker: - sudo docker run -ti --rm -p 5672:5672 \ - -p 15672:15672 rabbitmq:3-management + podman run -ti --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management dist-clean: clean rm -f *.out $(BINARY_WIN64) $(BINARY_LINUX64) $(BINARY_DARWIN64) diff --git a/README.md b/README.md index ee40302..1330adc 100644 --- a/README.md +++ b/README.md @@ -130,7 +130,8 @@ Usage: rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [-jknsv] rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [-jknsv] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--no-auto-ack] [-jksvn] - rabtap pub [--uri=URI] EXCHANGE [FILE] [--routingkey=KEY] [--format=FORMAT] [-jkv] + rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT] + [--delay=DELAY | --speed=FACTOR] [-jkv] rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv] rabtap exchange rm EXCHANGE [--uri=URI] [-kv] rabtap queue create QUEUE [--uri=URI] [-adkv] @@ -141,26 +142,32 @@ Usage: rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv] rabtap --version -Options: +Arguments and options: EXCHANGES comma-separated list of exchanges and binding keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. EXCHANGE name of an exchange, e.g. amq.direct. - FILE file to publish in pub mode. If omitted, stdin will be read. + SOURCE file or directory to publish in pub mode. If omitted, stdin will be read. QUEUE name of a queue. CONNECTION name of a connection. + DIR directory to read messages from. -a, --autodelete create auto delete exchange/queue. --api=APIURI connect to given API server. If APIURI is omitted, the environment variable RABTAP_APIURI will be used. -b, --bindingkey=KEY binding key to use in bind queue command. --by-connection output of info command starts with connections. --consumers include consumers and connections in output of info command. + --delay=DELAY Time to wait between sending messages during publish. + If not set then messages will be delayed as recorded. + The value must be suffixed with a time unit, e.g. ms, s etc. -d, --durable create durable exchange/queue. + --exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will + be taken from message being published (see JSON message format). --filter=EXPR Predicate for info command to filter queues [default: true] --format=FORMAT * for tap, pub, sub command: format to write/read messages to console - and optionally to file (when --saveto DIR is given). - Valid options are: "raw", "json", "json-nopp". Default: raw + and optionally to file (when --saveto DIR is given). + Valid options are: "raw", "json", "json-nopp". Default: raw * for info command: controls generated output format. Valid - options are: "text", "dot". Default: text + options are: "text", "dot". Default: text -h, --help print this help. -j, --json Deprecated. Use "--format json" instead. -k, --insecure allow insecure TLS connections (no certificate check). @@ -172,10 +179,13 @@ Options: when the channel is closed. --omit-empty don't show echanges without bindings in info command. --reason=REASON reason why the connection was closed [default: closed by rabtap]. - -r, --routingkey=KEY routing key to use in publish mode. + -r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key + will be taken from message being published (see JSON + message format). --saveto=DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. -s, --silent suppress message output to stdout. + --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. -t, --type=TYPE exchange type [default: fanout]. --uri=URI connect to given AQMP broker. If omitted, the @@ -200,7 +210,7 @@ Examples: # use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api export RABTAP_APIURI=http://guest:guest@localhost:15672/api rabtap info - rabtap info --filter "binding.Source == 'amq.topic'" -o + rabtap info --filter "binding.Source == 'amq.topic'" --omit-empty rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672" ``` @@ -213,7 +223,7 @@ Rabtap understands the following commands: binding). Simulatanous * `sub` - subscribes to a queue and consumes messages sent to the queue (acts like a RabbitMQ consumer) -* `pub` - send messages to an exchange. +* `pub` - publish messages to an exchange, optionally with the timing as recorded. * `info` - show broker related info (exchanges, queues, bindings, stats). The features of an exchange are displayed in square brackets with `D` (durable), `AD` (auto delete) and `I` (internal). The features of a queue are displayed @@ -402,17 +412,34 @@ Example assumes that `RABTAP_AMQPURI` environment variable is set, as the #### Publish messages The `pub` command is used to publish messages to an exchange with a routing -key. Messages can be published either in raw format, in which they are send -as-is, or in [JSON-format, as described here](#json-message-format), which -includes message metadata and the body in a single JSON document. - -* `$ echo hello | rabtap pub amq.fanout` - publish "hello" to - exchange amqp.fanout -* `$ rabtap pub amq.direct -r routingKey message.json --format json` - publish - message(s) in JSON format to exchange `amq.direct` with routing key - `routingKey`. -* `$ rabtap pub amq.direct -r routingKey --json < message.json` - same - as above, but read message(s) from stdin. +key. The messages to be published are either read from a file, or from a +directory which contains previously recorded messages (e.g. using the +`--saveto` option of the `tap` command). Messages can be published either in +raw format, in which they are send as-is, or in [JSON-format, as described +here](#json-message-format), which includes message metadata and the body in a +single JSON document. + +The general form of the `pub` command is +``` +rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT] + [--delay=DELAY | --speed=FACTOR] [-jkv] +``` + +* `$ echo hello | rabtap pub amq.fanout` - publish "hello" to exchange amqp.fanout +* `$ rabtap pub messages.json --format=json` - messages are read from file `messages.json` + in [raptab JSON format](#json-message-format). Target exchange and routing + keys are read from the messages meta data. The `messages.json` file can + contain multiple JSON documents as it is treated as a JSON stream. Rabtap + will honor the `XRabtapReceived` timestamps of the messages and by default + delay the messages as they were recorded. This behaviour can be overridden + by the `--delay` and `--speed` options. +* `$ rabtap pub amq.direct -r myKey --format=json messages.json --delay=0s` - as + before, but publish messages always to exchange `amq.direct` with routing key + `myKey` and without any delays. +* `$ rabtap pub amq.direct -r myKey --format=raw somedir --delay=0s` - as + before, but assuming that `somedir` is a directory, the messages are read + from message files previously recorded to this directory and replayed in the + order they were recorded. #### Poor mans shovel @@ -523,7 +550,6 @@ messages in the following format: } ... ``` - Note that in JSON mode, the `Body` is base64 encoded. ## Filtering output of info command @@ -563,14 +589,14 @@ available in the expression as variables: The examples assume that `RABTAP_APIURI` environment variable points to the broker to be used, e.g. `http://guest:guest@localhost:15672/api`). -* `rabtap info --filter "exchange.Name == 'amq.direct'" --omit-empty`: print +* `rabtap info --filter "exchange.Name == 'amq.direct'" --omit-empty` - print only queues bound to exchange `amq.direct` and skip all empty exchanges. -* `rabtap info --filter "queue.Name =~ '.*test.*'" --omit-empty`: print all +* `rabtap info --filter "queue.Name =~ '.*test.*'" --omit-empty` - print all queues with `test` in their name. -* `rabtap info --filter "queue.Name =~ '.*test.*' && exchange.Type == 'topic'" --omit-empty`: like +* `rabtap info --filter "queue.Name =~ '.*test.*' && exchange.Type == 'topic'" --omit-empty` - like before, but consider only exchanges of type `topic`. -* `rabtap info --filter "queue.Consumers > 0" --omit --stats --consumers`: print all queues with at - one consumer +* `rabtap info --filter "queue.Consumers > 0" --omit --stats --consumers` - print + all queues with at least one consumer ### Type reference @@ -580,6 +606,9 @@ transformed to golang types. #### Exchange type +
+ Definition of the Exchange type + ```go type Exchange struct { Name string @@ -600,9 +629,13 @@ type Exchange struct { } } ``` +
#### Queue type +
+ Definition of the Queue type + ```go type Queue struct { MessagesDetails struct { @@ -664,9 +697,13 @@ type Queue struct { Memory int } ``` +
#### Binding type +
+ Definition of the Binding type + ```go type Binding struct { Source string @@ -678,25 +715,22 @@ type Binding struct { } ``` +
+ ## Build from source ### Download and build using go get + ``` $ GO111MODULE=on go get github.com/jandelgado/rabtap/cmd/rabtap ``` ### Build using Makefile and tests -To build rabtap from source, you need [go](https://golang.org/) (version >= 12) -and the following tools installed: - -* [ineffassign](https://github.com/gordonklaus/ineffassign) -* [misspell](https://github.com/client9/misspell/cmd/misspell) -* [golint](https://github.com/golang/lint) -* [gocyclo](https://github.com/fzipp/gocyclo) +To build rabtap from source, you need [go](https://golang.org/) (version >= 1.12) +and [golangci-lint](https://github.com/golangci/golangci-lint) installed. ``` -$ export GO111MODULE=on $ git clone https://github.com/jandelgado/rabtap && cd rabtap $ make test -or- make short-test $ make diff --git a/cmd/rabtap/cmd_exchange_test.go b/cmd/rabtap/cmd_exchange_test.go index 7bdc0af..007c049 100644 --- a/cmd/rabtap/cmd_exchange_test.go +++ b/cmd/rabtap/cmd_exchange_test.go @@ -4,4 +4,48 @@ package main -// TODO +import ( + "crypto/tls" + "net/url" + "os" + "testing" + "time" + + rabtap "github.com/jandelgado/rabtap/pkg" + "github.com/jandelgado/rabtap/pkg/testcommon" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// exchangeExists queries the API to check if a given exchange exists +func exchangeExists(t *testing.T, apiURL *url.URL, exchange string) bool { + // TODO add a simple client to testcommon + client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{}) + exchanges, err := client.Exchanges() + require.Nil(t, err) + return rabtap.FindExchangeByName(exchanges, "/", exchange) != -1 +} + +func TestIntegrationCmdExchangeCreateRemoveExchange(t *testing.T) { + // integration tests creation and removal of exchange + oldArgs := os.Args + defer func() { os.Args = oldArgs }() + + const testExchange = "cmd-exchange-test" + + amqpURI := testcommon.IntegrationURIFromEnv() + apiURL, _ := url.Parse(testcommon.IntegrationAPIURIFromEnv()) + + assert.False(t, exchangeExists(t, apiURL, testExchange)) + os.Args = []string{"rabtap", "exchange", "create", testExchange, "--uri", amqpURI} + main() + time.Sleep(2 * time.Second) + assert.True(t, exchangeExists(t, apiURL, testExchange)) + + // TODO validation + + os.Args = []string{"rabtap", "exchange", "rm", testExchange, "--uri", amqpURI} + main() + time.Sleep(2 * time.Second) + assert.False(t, exchangeExists(t, apiURL, testExchange)) +} diff --git a/cmd/rabtap/cmd_publish.go b/cmd/rabtap/cmd_publish.go index 6380fde..9957d1b 100644 --- a/cmd/rabtap/cmd_publish.go +++ b/cmd/rabtap/cmd_publish.go @@ -1,32 +1,51 @@ -// Copyright (C) 2017 Jan Delgado +// publish messages +// Copyright (C) 2017-2019 Jan Delgado package main import ( "context" "crypto/tls" - "encoding/json" - "fmt" "io" - "io/ioutil" + "time" rabtap "github.com/jandelgado/rabtap/pkg" "github.com/streadway/amqp" "golang.org/x/sync/errgroup" ) -// MessageReaderFunc provides messages that can be sent to an exchange. -// returns the message to be published, a flag if more messages are to be read, -// and an error. -type MessageReaderFunc func() (amqp.Publishing, bool, error) - // CmdPublishArg contains arguments for the publish command type CmdPublishArg struct { amqpURI string tlsConfig *tls.Config - exchange string - routingKey string + exchange *string + routingKey *string readerFunc MessageReaderFunc + speed float64 + fixedDelay *time.Duration +} + +type DelayFunc func(first, second *RabtapPersistentMessage) + +func multDuration(duration time.Duration, factor float64) time.Duration { + d := float64(duration.Nanoseconds()) * factor + return time.Duration(int(d)) +} + +// durationBetweenMessages calculates the delay to make between the +// publishing of two previously recorded messages. +func durationBetweenMessages(first, second *RabtapPersistentMessage, + speed float64, fixedDelay *time.Duration) time.Duration { + if first == nil || second == nil { + return time.Duration(0) + } + if fixedDelay != nil { + return *fixedDelay + } + firstTs := first.XRabtapReceivedTimestamp + secondTs := second.XRabtapReceivedTimestamp + delta := secondTs.Sub(firstTs) + return multDuration(delta, speed) } // publishMessage publishes a single message on the given exchange with the @@ -35,8 +54,8 @@ func publishMessage(publishChannel rabtap.PublishChannel, exchange, routingKey string, amqpPublishing amqp.Publishing) { - log.Debugf("publishing message %+v to exchange %s with routing key %s", - amqpPublishing, exchange, routingKey) + log.Debugf("publishing message to exchange '%s' with routing key '%s'", + exchange, routingKey) publishChannel <- &rabtap.PublishMessage{ Exchange: exchange, @@ -44,49 +63,22 @@ func publishMessage(publishChannel rabtap.PublishChannel, Publishing: &amqpPublishing} } -// readSingleMessageFromRawFile reads a single messages from the given io.Reader -// which is typically stdin or a file. If reading from stdin, CTRL+D (linux) -// or CTRL+Z (Win) on an empty line terminates the reader. -func readSingleMessageFromRawFile(reader io.Reader) (amqp.Publishing, bool, error) { - buf, err := ioutil.ReadAll(reader) - return amqp.Publishing{Body: buf}, false, err -} - -// readNextMessageFromJSONStream reads JSON messages from the given decoder as long -// as there are messages available. -func readNextMessageFromJSONStream(decoder *json.Decoder) (amqp.Publishing, bool, error) { - message := RabtapPersistentMessage{} - err := decoder.Decode(&message) - if err != nil { - return amqp.Publishing{}, false, err +// selectOptionalOrDefault returns either an optional string, if set, or +// a default value. +func selectOptionalOrDefault(optionalStr *string, defaultStr string) string { + if optionalStr != nil { + return *optionalStr } - return message.ToAmqpPublishing(), true, nil + return defaultStr } -// createMessageReaderFunc returns a function that reads messages from the -// the given reader in JSON or raw-format -func createMessageReaderFunc(format string, reader io.ReadCloser) (MessageReaderFunc, error) { - switch format { - case "json-nopp": - fallthrough - case "json": - decoder := json.NewDecoder(reader) - return func() (amqp.Publishing, bool, error) { - return readNextMessageFromJSONStream(decoder) - }, nil - case "raw": - return func() (amqp.Publishing, bool, error) { - return readSingleMessageFromRawFile(reader) - }, nil - } - return nil, fmt.Errorf("invaild format %s", format) -} - -// publishMessages reads messages with the provided readNextMessageFunc and -// publishes the messages to the given exchange. When done closes -// the publishChannel +// publishMessageStream publishes messages from the provided message stream +// provided by readNextMessageFunc. When done closes the publishChannel func publishMessageStream(publishChannel rabtap.PublishChannel, - exchange, routingKey string, readNextMessageFunc MessageReaderFunc) error { + optExchange, optRoutingKey *string, + readNextMessageFunc MessageReaderFunc, + delayFunc DelayFunc) error { + var lastMsg *RabtapPersistentMessage for { msg, more, err := readNextMessageFunc() switch err { @@ -94,7 +86,11 @@ func publishMessageStream(publishChannel rabtap.PublishChannel, close(publishChannel) return nil case nil: - publishMessage(publishChannel, exchange, routingKey, msg) + delayFunc(lastMsg, &msg) + routingKey := selectOptionalOrDefault(optRoutingKey, msg.RoutingKey) + exchange := selectOptionalOrDefault(optExchange, msg.Exchange) + publishMessage(publishChannel, exchange, routingKey, msg.ToAmqpPublishing()) + lastMsg = &msg default: close(publishChannel) return err @@ -123,14 +119,20 @@ func cmdPublish(ctx context.Context, cmd CmdPublishArg) error { publisher := rabtap.NewAmqpPublish(cmd.amqpURI, cmd.tlsConfig, log) publishChannel := make(rabtap.PublishChannel) + delayFunc := func(first, second *RabtapPersistentMessage) { + delay := durationBetweenMessages(first, second, cmd.speed, cmd.fixedDelay) + log.Infof("sleeping for %s", delay) + time.Sleep(delay) // TODO make interuptable + } + go func() { // runs as long as readerFunc returns messages. Unfortunately, we // can not stop a blocking read on a file like we do with channels // and select. So we don't put the goroutine in the error group to // avoid blocking when e.g. the user presses CTRL+S and then CTRL+C. - // TODO come up with better solution + // TODO find better solution errChan <- publishMessageStream(publishChannel, cmd.exchange, - cmd.routingKey, cmd.readerFunc) + cmd.routingKey, cmd.readerFunc, delayFunc) }() g.Go(func() error { diff --git a/cmd/rabtap/cmd_publish_test.go b/cmd/rabtap/cmd_publish_test.go index 930c7cb..606e956 100644 --- a/cmd/rabtap/cmd_publish_test.go +++ b/cmd/rabtap/cmd_publish_test.go @@ -1,22 +1,135 @@ -// Copyright (C) 2017 Jan Delgado +// Copyright (C) 2017-2020 Jan Delgado // +build integration package main import ( + "context" + "crypto/tls" + "errors" + "io" "io/ioutil" "os" + "path/filepath" "testing" "time" + rabtap "github.com/jandelgado/rabtap/pkg" "github.com/jandelgado/rabtap/pkg/testcommon" "github.com/streadway/amqp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func TestCmdPublishRaw(t *testing.T) { +func TestMultDurationReturnsCorrectValue(t *testing.T) { + assert.Equal(t, time.Duration(50), multDuration(time.Duration(100), 0.5)) +} + +func TestDurationBetweenMessagesReturnsZeroIfAnyOfTheArgumentsIsNil(t *testing.T) { + msg := RabtapPersistentMessage{XRabtapReceivedTimestamp: time.Now()} + fixed := time.Duration(123) + for _, delay := range []*time.Duration{nil, &fixed} { + assert.Equal(t, time.Duration(0), durationBetweenMessages(&msg, nil, 1., delay)) + assert.Equal(t, time.Duration(0), durationBetweenMessages(nil, &msg, 1., delay)) + assert.Equal(t, time.Duration(0), durationBetweenMessages(nil, nil, 1., delay)) + } +} + +func TestDurationBetweenMessagesReturnsFixedDurationIfSet(t *testing.T) { + msg := RabtapPersistentMessage{XRabtapReceivedTimestamp: time.Now()} + fixed := time.Duration(123) + assert.Equal(t, time.Duration(123), durationBetweenMessages(&msg, &msg, 1., &fixed)) +} + +func TestDurationBetweenMessagesCorrectlyScalesDuration(t *testing.T) { + first := time.Unix(0, 0) + second := time.Unix(0, 1000) + assert.Equal(t, time.Duration(500), durationBetweenMessages( + &RabtapPersistentMessage{XRabtapReceivedTimestamp: first}, + &RabtapPersistentMessage{XRabtapReceivedTimestamp: second}, + .5, nil)) +} + +func TestDurationBetweenMessagesIsCalculatedCorrectly(t *testing.T) { + first := time.Unix(0, 0) + second := time.Unix(0, 1000) + assert.Equal(t, time.Duration(1000), durationBetweenMessages( + &RabtapPersistentMessage{XRabtapReceivedTimestamp: first}, + &RabtapPersistentMessage{XRabtapReceivedTimestamp: second}, + 1., nil)) +} + +func TestSelectOptionOrDefaultReturnsOptionalIfSet(t *testing.T) { + opt := "optional" + assert.Equal(t, "optional", selectOptionalOrDefault(&opt, "default")) +} + +func TestSelectOptionOrDefaultReturnsDefaultIfOptionalIsNil(t *testing.T) { + assert.Equal(t, "default", selectOptionalOrDefault(nil, "default")) +} + +func TestPublishMessageStreamPublishesNextMessage(t *testing.T) { + mockReader := func() (RabtapPersistentMessage, bool, error) { + return RabtapPersistentMessage{Body: []byte("hello")}, false, nil + } + delayer := func(first, second *RabtapPersistentMessage) {} + + pubCh := make(rabtap.PublishChannel, 1) + exchange := "exchange" + routingKey := "key" + err := publishMessageStream(pubCh, &exchange, &routingKey, mockReader, delayer) + + assert.Nil(t, err) + select { + case message := <-pubCh: + assert.Equal(t, "exchange", message.Exchange) + assert.Equal(t, "key", message.RoutingKey) + assert.Equal(t, "hello", string(message.Publishing.Body)) + case <-time.After(time.Second * 2): + assert.Fail(t, "did not receive message within expected time") + } + // expect channel to be closed now + select { + case _, more := <-pubCh: + assert.False(t, more) + case <-time.After(time.Second * 2): + assert.Fail(t, "did not receive message within expected time") + } +} + +func TestPublishMessageStreamPropagatesMessageReadError(t *testing.T) { + mockReader := func() (RabtapPersistentMessage, bool, error) { + return RabtapPersistentMessage{}, false, errors.New("error") + } + delayer := func(first, second *RabtapPersistentMessage) {} + + pubCh := make(rabtap.PublishChannel) + exchange := "" + routingKey := "" + err := publishMessageStream(pubCh, &exchange, &routingKey, mockReader, delayer) + assert.Equal(t, errors.New("error"), err) +} + +func TestCmdPublishFailsWithInvaludBrokerURI(t *testing.T) { + ctx := context.Background() + + tlsConfig := &tls.Config{} + dum := "" + args := CmdPublishArg{ + amqpURI: "invalid", + exchange: &dum, + routingKey: &dum, + tlsConfig: tlsConfig, + readerFunc: func() (RabtapPersistentMessage, bool, error) { + return RabtapPersistentMessage{}, false, io.EOF + }} + err := cmdPublish(ctx, args) + assert.NotNil(t, err) +} + +func TestCmdPublishARawFileWithExchangeAndRoutingKey(t *testing.T) { + // integrative test publishing a raw file tmpfile, err := ioutil.TempFile("", "rabtap") require.Nil(t, err) @@ -46,7 +159,7 @@ func TestCmdPublishRaw(t *testing.T) { // message is in tmpfile.Name() os.Args = []string{"rabtap", "pub", "--uri", testcommon.IntegrationURIFromEnv(), - "exchange", + "--exchange=exchange", tmpfile.Name(), "--routingkey", routingKey} @@ -62,12 +175,18 @@ func TestCmdPublishRaw(t *testing.T) { } } -func TestCmdPublishJSON(t *testing.T) { +func TestCmdPublishAJSONFileWithIncludedRoutingKeyAndExchange(t *testing.T) { + + conn, ch := testcommon.IntegrationTestConnection(t, "myexchange", "topic", 1, false) + defer conn.Close() + + queueName := testcommon.IntegrationQueueName(0) + routingKey := queueName - // in the test we send a stream of 2 messages. + // in the integrative test we send a stream of 2 messages. // note: base64dec("aGVsbG8=") == "hello" // base64dec("c2Vjb25kCg==") == "second\n" - testmessage := ` + testmessages := ` { "Headers": null, "ContentType": "text/plain", @@ -84,27 +203,29 @@ func TestCmdPublishJSON(t *testing.T) { "AppID": "rabtap.testgen", "DeliveryTag": 63, "Redelivered": false, - "Exchange": "amq.topic", - "RoutingKey": "test-q-amq.topic-0", - "Body": "aGVsbG8=" + "Exchange": "myexchange", + "RoutingKey": "` + routingKey + `", + "Body": "aGVsbG8=", + "XRabtapReceivedTimestamp": "2017-10-28T23:45:33+02:00" } { - "Body": "c2Vjb25kCg==" + "Exchange": "myexchange", + "RoutingKey": "` + routingKey + `", + "Body": "c2Vjb25kCg==", + "XRabtapReceivedTimestamp": "2017-10-28T23:45:34+02:00" + } + { + "Body": "c2Vjb25kCg==", + "XRabtapReceivedTimestamp": "2017-10-28T23:45:35+02:00" }` tmpfile, err := ioutil.TempFile("", "rabtap") require.Nil(t, err) defer os.Remove(tmpfile.Name()) - _, err = tmpfile.Write([]byte(testmessage)) + _, err = tmpfile.Write([]byte(testmessages)) require.Nil(t, err) - conn, ch := testcommon.IntegrationTestConnection(t, "exchange", "topic", 1, false) - defer conn.Close() - - queueName := testcommon.IntegrationQueueName(0) - routingKey := queueName - deliveries, err := ch.Consume( queueName, "test-consumer", @@ -120,13 +241,12 @@ func TestCmdPublishJSON(t *testing.T) { defer func() { os.Args = oldArgs }() // execution: run publish command through call of main(), the actual - // message is in tmpfile.Name() + // message is in tmpfile.Name(). We expect exchange and routingkey to + // be taken from the JSON metadata as they are not specified. os.Args = []string{"rabtap", "pub", "--uri", testcommon.IntegrationURIFromEnv(), - "exchange", tmpfile.Name(), - "--routingkey", routingKey, - "--json"} + "--format=json"} main() // verification: we expect 2 messages to be sent by above call @@ -139,11 +259,70 @@ func TestCmdPublishJSON(t *testing.T) { } } - assert.Equal(t, "exchange", message[0].Exchange) + assert.Equal(t, "myexchange", message[0].Exchange) assert.Equal(t, routingKey, message[0].RoutingKey) assert.Equal(t, "hello", string(message[0].Body)) - assert.Equal(t, "exchange", message[1].Exchange) + assert.Equal(t, "myexchange", message[1].Exchange) assert.Equal(t, routingKey, message[1].RoutingKey) assert.Equal(t, "second\n", string(message[1].Body)) } + +func TestCmdPublishFilesFromDirectory(t *testing.T) { + // publish message stored in a directory as separate files (json-metadata + // and raw message file) + + conn, ch := testcommon.IntegrationTestConnection(t, "myexchange", "topic", 1, false) + defer conn.Close() + + queueName := testcommon.IntegrationQueueName(0) + routingKey := queueName + + msg := `{ "Exchange": "myexchange", "RoutingKey": "` + routingKey + `", "Body": "ixxx" }` + + dir, err := ioutil.TempDir("", "") + require.Nil(t, err) + defer os.RemoveAll(dir) + + err = ioutil.WriteFile(filepath.Join(dir, "rabtap-1.json"), []byte(msg), 0666) + require.Nil(t, err) + err = ioutil.WriteFile(filepath.Join(dir, "rabtap-1.dat"), []byte("Hello123"), 0666) + require.Nil(t, err) + + deliveries, err := ch.Consume( + queueName, + "test-consumer", + false, // noAck + true, // exclusive + false, // noLocal + false, // noWait + nil, // arguments + ) + require.Nil(t, err) + + oldArgs := os.Args + defer func() { os.Args = oldArgs }() + + // execution: run publish command through call of main(), the actual + // message is read from the provided directory. We expect exchange and + // routingkey to be taken from the JSON metadata as they are not specified. + os.Args = []string{"rabtap", "pub", + "--uri", testcommon.IntegrationURIFromEnv(), + dir, + "--format=raw"} + main() + + // verification: we expect 1 messages to be sent by above call + var message [1]amqp.Delivery + for i := 0; i < 1; i++ { + select { + case message[i] = <-deliveries: + case <-time.After(time.Second * 2): + assert.Fail(t, "did not receive message within expected time") + } + } + + assert.Equal(t, "myexchange", message[0].Exchange) + assert.Equal(t, routingKey, message[0].RoutingKey) + assert.Equal(t, "Hello123", string(message[0].Body)) +} diff --git a/cmd/rabtap/cmd_queue_test.go b/cmd/rabtap/cmd_queue_test.go index 7dd3ecc..bad18f2 100644 --- a/cmd/rabtap/cmd_queue_test.go +++ b/cmd/rabtap/cmd_queue_test.go @@ -17,10 +17,10 @@ import ( "github.com/stretchr/testify/require" ) -func TestCmdPurgeQueue(t *testing.T) { +func TestIntegrationCmdQueueCreatePurgeiBindUnbindQueue(t *testing.T) { - // create a queue, publish some messages, purge queue and make - // sure queue is empty + // integration tests queue creation, bind to exchange, purge, + // unbdind from exchange via calls through the main method oldArgs := os.Args defer func() { os.Args = oldArgs }() @@ -32,14 +32,10 @@ func TestCmdPurgeQueue(t *testing.T) { amqpURI := testcommon.IntegrationURIFromEnv() apiURL, _ := url.Parse(testcommon.IntegrationAPIURIFromEnv()) - os.Args = []string{"rabtap", "queue", - "create", testQueue, - "--uri", amqpURI} + os.Args = []string{"rabtap", "queue", "create", testQueue, "--uri", amqpURI} main() - os.Args = []string{"rabtap", "queue", - "bind", testQueue, - "to", testExchange, + os.Args = []string{"rabtap", "queue", "bind", testQueue, "to", testExchange, "--bindingkey", testQueue, "--uri", amqpURI} main() @@ -47,12 +43,12 @@ func TestCmdPurgeQueue(t *testing.T) { // TODO publish some messages // purge queue and check size - os.Args = []string{"rabtap", "queue", - "purge", testQueue, - "--uri", amqpURI} + os.Args = []string{"rabtap", "queue", "purge", testQueue, "--uri", amqpURI} main() time.Sleep(2 * time.Second) + + // TODO add a simple client to testcommon client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{}) queues, err := client.Queues() assert.Nil(t, err) @@ -63,10 +59,15 @@ func TestCmdPurgeQueue(t *testing.T) { queue := queues[i] assert.Equal(t, 0, queue.Messages) - // remove queue - os.Args = []string{"rabtap", "queue", - "rm", testQueue, + // unbind queue + os.Args = []string{"rabtap", "queue", "unbind", testQueue, "from", testExchange, + "--bindingkey", testQueue, "--uri", amqpURI} main() + // remove queue + os.Args = []string{"rabtap", "queue", "rm", testQueue, "--uri", amqpURI} + main() + + // TODO check that queue is removed } diff --git a/cmd/rabtap/cmd_replay_test.go b/cmd/rabtap/cmd_replay_test.go new file mode 100644 index 0000000..8d3dcaf --- /dev/null +++ b/cmd/rabtap/cmd_replay_test.go @@ -0,0 +1,3 @@ +// Copyright (C) 2019 Jan Delgado + +package main diff --git a/cmd/rabtap/cmd_subscribe.go b/cmd/rabtap/cmd_subscribe.go index 25f5971..ff88608 100644 --- a/cmd/rabtap/cmd_subscribe.go +++ b/cmd/rabtap/cmd_subscribe.go @@ -27,12 +27,9 @@ func cmdSubscribe(ctx context.Context, cmd CmdSubscribeArg) error { g, ctx := errgroup.WithContext(ctx) - // this channel is used to decouple message receiving threads - // with the main thread, which does the actual message processing messageChannel := make(rabtap.TapChannel) config := rabtap.AmqpSubscriberConfig{Exclusive: false, AutoAck: cmd.AutoAck} subscriber := rabtap.NewAmqpSubscriber(config, cmd.amqpURI, cmd.tlsConfig, log) - //defer subscriber.Close() g.Go(func() error { return subscriber.EstablishSubscription(ctx, cmd.queue, messageChannel) }) g.Go(func() error { return messageReceiveLoop(ctx, messageChannel, cmd.messageReceiveFunc) }) diff --git a/cmd/rabtap/cmd_subscribe_test.go b/cmd/rabtap/cmd_subscribe_test.go index 54c5dc5..fb96fbe 100644 --- a/cmd/rabtap/cmd_subscribe_test.go +++ b/cmd/rabtap/cmd_subscribe_test.go @@ -11,6 +11,8 @@ import ( "context" "crypto/tls" "io" + "os" + "syscall" "testing" "time" @@ -18,6 +20,7 @@ import ( "github.com/jandelgado/rabtap/pkg/testcommon" "github.com/streadway/amqp" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) { @@ -46,8 +49,10 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) { func TestCmdSub(t *testing.T) { const testMessage = "SubHello" const testQueue = "sub-queue-test" - const testKey = testQueue - const testExchange = "sub-exchange-test" + testKey := testQueue + + testExchange := "" + // testExchange := "sub-exchange-test" tlsConfig := &tls.Config{} amqpURI := testcommon.IntegrationURIFromEnv() @@ -62,16 +67,9 @@ func TestCmdSub(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - // creat exchange - cmdExchangeCreate(CmdExchangeCreateArg{amqpURI: amqpURI, - exchange: testExchange, exchangeType: "fanout", - durable: false, tlsConfig: tlsConfig}) - defer cmdExchangeRemove(amqpURI, testExchange, tlsConfig) - // create and bind queue cmdQueueCreate(CmdQueueCreateArg{amqpURI: amqpURI, queue: testQueue, tlsConfig: tlsConfig}) - cmdQueueBindToExchange(amqpURI, testQueue, testKey, testExchange, tlsConfig) defer cmdQueueRemove(amqpURI, testQueue, tlsConfig) // subscribe to testQueue @@ -84,34 +82,77 @@ func TestCmdSub(t *testing.T) { time.Sleep(time.Second * 1) messageCount := 0 + + // TODO test without cmdPublish cmdPublish( ctx, CmdPublishArg{ amqpURI: amqpURI, - exchange: testExchange, - routingKey: testKey, + exchange: &testExchange, + routingKey: &testKey, tlsConfig: tlsConfig, - readerFunc: func() (amqp.Publishing, bool, error) { + readerFunc: func() (RabtapPersistentMessage, bool, error) { // provide exactly one message if messageCount > 0 { - return amqp.Publishing{}, false, io.EOF + return RabtapPersistentMessage{}, false, io.EOF } messageCount++ - return amqp.Publishing{ + return RabtapPersistentMessage{ Body: []byte(testMessage), ContentType: "text/plain", DeliveryMode: amqp.Transient, }, true, nil }}) - // test if our tap received the message + // test if we received the message select { case <-done: case <-time.After(time.Second * 2): assert.Fail(t, "did not receive message within expected time") } cancel() // stop cmdSubscribe() +} + +func TestCmdSubIntegration(t *testing.T) { + const testMessage = "SubHello" + const testQueue = "sub-queue-test" + testKey := testQueue + testExchange := "" // default exchange + + tlsConfig := &tls.Config{} + amqpURI := testcommon.IntegrationURIFromEnv() + + cmdQueueCreate(CmdQueueCreateArg{amqpURI: amqpURI, + queue: testQueue, tlsConfig: tlsConfig}) + defer cmdQueueRemove(amqpURI, testQueue, tlsConfig) + + _, ch := testcommon.IntegrationTestConnection(t, "", "", 0, false) + err := ch.Publish( + testExchange, + testKey, + false, // mandatory + false, // immediate + amqp.Publishing{ + Body: []byte("Hello"), + ContentType: "text/plain", + DeliveryMode: amqp.Transient, + Headers: amqp.Table{}, + }) + require.Nil(t, err) + + go func() { + time.Sleep(time.Second * 2) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + }() + + oldArgs := os.Args + defer func() { os.Args = oldArgs }() + os.Args = []string{"rabtap", "sub", + "--uri", testcommon.IntegrationURIFromEnv(), + testQueue, + "--format=raw", + "--no-color"} + output := testcommon.CaptureOutput(main) - cmdQueueUnbindFromExchange(amqpURI, testQueue, testKey, testExchange, tlsConfig) - // TODO check that queue is unbound + assert.Regexp(t, "(?s).*message received.*\nroutingkey.....: sub-queue-test\n.*Hello", output) } diff --git a/cmd/rabtap/cmd_tap.go b/cmd/rabtap/cmd_tap.go index d1a6ee9..0253ef4 100644 --- a/cmd/rabtap/cmd_tap.go +++ b/cmd/rabtap/cmd_tap.go @@ -19,14 +19,10 @@ func cmdTap(ctx context.Context, tapConfig []rabtap.TapConfiguration, tlsConfig g, ctx := errgroup.WithContext(ctx) - // this channel is used to decouple message receiving threads - // with the main thread, which does the actual message processing tapMessageChannel := make(rabtap.TapChannel) - // taps := []*rabtap.AmqpTap{} for _, config := range tapConfig { tap := rabtap.NewAmqpTap(config.AmqpURI, tlsConfig, log) - // taps = append(taps, tap) g.Go(func() error { return tap.EstablishTap(ctx, config.Exchanges, tapMessageChannel) }) diff --git a/cmd/rabtap/cmd_tap_test.go b/cmd/rabtap/cmd_tap_test.go index 597c98a..8488b07 100644 --- a/cmd/rabtap/cmd_tap_test.go +++ b/cmd/rabtap/cmd_tap_test.go @@ -7,6 +7,8 @@ package main import ( "context" "crypto/tls" + "os" + "syscall" "testing" "time" @@ -63,3 +65,39 @@ func TestCmdTap(t *testing.T) { } cancel() // stop cmdTap() } + +func TestCmdTapIntegration(t *testing.T) { + const testMessage = "TapHello" + const testQueue = "tap-queue-test" + testKey := testQueue + testExchange := "amq.topic" + + go func() { + time.Sleep(time.Second * 1) + _, ch := testcommon.IntegrationTestConnection(t, "", "", 0, false) + err := ch.Publish( + testExchange, + testKey, + false, // mandatory + false, // immediate + amqp.Publishing{ + Body: []byte("Hello"), + ContentType: "text/plain", + DeliveryMode: amqp.Transient, + Headers: amqp.Table{}, + }) + require.Nil(t, err) + time.Sleep(time.Second * 1) + syscall.Kill(syscall.Getpid(), syscall.SIGINT) + }() + + oldArgs := os.Args + defer func() { os.Args = oldArgs }() + os.Args = []string{"rabtap", "tap", + "--uri", testcommon.IntegrationURIFromEnv(), + "amq.topic:" + testKey, + "--format=raw", + "--no-color"} + output := testcommon.CaptureOutput(main) + assert.Regexp(t, "(?s).*message received.*\nroutingkey.....: tap-queue-test\n.*Hello", output) +} diff --git a/cmd/rabtap/command_line.go b/cmd/rabtap/command_line.go index 2fc3f3c..3a343eb 100644 --- a/cmd/rabtap/command_line.go +++ b/cmd/rabtap/command_line.go @@ -1,5 +1,6 @@ // command line parsing for rabtap // TODO split in per-command parsers +// TODO use docopt's bind feature to simplify mappings // Copyright (C) 2017-2019 Jan Delgado package main @@ -8,6 +9,8 @@ import ( "errors" "fmt" "os" + "strconv" + "time" docopt "github.com/docopt/docopt-go" rabtap "github.com/jandelgado/rabtap/pkg" @@ -18,7 +21,7 @@ import ( var RabtapAppVersion = "(version not specified)" const ( - // note: usage is interpreted by docopt - this is code. + // note: usage is DSL interpreted by docopt - this is code. Change carefully. usage = `rabtap - RabbitMQ wire tap. github.com/jandelgado/rabtap Usage: @@ -28,7 +31,8 @@ Usage: rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [-jknsv] rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [-jknsv] rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--no-auto-ack] [-jksvn] - rabtap pub [--uri=URI] EXCHANGE [FILE] [--routingkey=KEY] [--format=FORMAT] [-jkv] + rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT] + [--delay=DELAY | --speed=FACTOR] [-jkv] rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv] rabtap exchange rm EXCHANGE [--uri=URI] [-kv] rabtap queue create QUEUE [--uri=URI] [-adkv] @@ -39,20 +43,26 @@ Usage: rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv] rabtap --version -Options: +Arguments and options: EXCHANGES comma-separated list of exchanges and binding keys, e.g. amq.topic:# or exchange1:key1,exchange2:key2. EXCHANGE name of an exchange, e.g. amq.direct. - FILE file to publish in pub mode. If omitted, stdin will be read. + SOURCE file or directory to publish in pub mode. If omitted, stdin will be read. QUEUE name of a queue. CONNECTION name of a connection. + DIR directory to read messages from. -a, --autodelete create auto delete exchange/queue. --api=APIURI connect to given API server. If APIURI is omitted, the environment variable RABTAP_APIURI will be used. -b, --bindingkey=KEY binding key to use in bind queue command. --by-connection output of info command starts with connections. --consumers include consumers and connections in output of info command. + --delay=DELAY Time to wait between sending messages during publish. + If not set then messages will be delayed as recorded. + The value must be suffixed with a time unit, e.g. ms, s etc. -d, --durable create durable exchange/queue. + --exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will + be taken from message being published (see JSON message format). --filter=EXPR Predicate for info command to filter queues [default: true] --format=FORMAT * for tap, pub, sub command: format to write/read messages to console and optionally to file (when --saveto DIR is given). @@ -70,10 +80,13 @@ Options: when the channel is closed. --omit-empty don't show echanges without bindings in info command. --reason=REASON reason why the connection was closed [default: closed by rabtap]. - -r, --routingkey=KEY routing key to use in publish mode. + -r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key + will be taken from message being published (see JSON + message format). --saveto=DIR also save messages and metadata to DIR. --show-default include default exchange in output info command. -s, --silent suppress message output to stdout. + --speed=FACTOR Speed factor to use during publish [default: 1.0]. --stats include statistics in output of info command. -t, --type=TYPE exchange type [default: fanout]. --uri=URI connect to given AQMP broker. If omitted, the @@ -98,7 +111,7 @@ Examples: # use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api export RABTAP_APIURI=http://guest:guest@localhost:15672/api rabtap info - rabtap info --filter "binding.Source == 'amq.topic'" -o + rabtap info --filter "binding.Source == 'amq.topic'" --omit-empty rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672" ` ) @@ -141,7 +154,7 @@ type commonArgs struct { } // CommandLineArgs represents the parsed command line arguments -// TODO does not scale well - split in per-cmd structs? +// TODO does not scale well - split in per-cmd structs type CommandLineArgs struct { Cmd ProgramCmd commonArgs @@ -149,28 +162,29 @@ type CommandLineArgs struct { TapConfig []rabtap.TapConfiguration // configuration in tap mode APIURI string - PubExchange string // pub mode: exchange to publish to - PubRoutingKey string // pub mode: routing key, defaults to "" - PubFile *string // pub mode: file to send - AutoAck bool // sub mode: auto ack enabled - QueueName string // queue create, remove, bind, sub - QueueBindingKey string // queue bind - ExchangeName string // exchange name create, remove or queue bind - ExchangeType string // exchange type create, remove or queue bind - ShowConsumers bool // info mode: also show consumer - InfoMode string // info mode: byExchange, byConnection - ShowStats bool // info mode: also show statistics - QueueFilter string // info mode: optional filter predicate - OmitEmptyExchanges bool // info mode: do not show exchanges wo/ bindings - ShowDefaultExchange bool // info mode: show default exchange - Format string // output format, depends on command - Durable bool // queue create, exchange create - Autodelete bool // queue create, exchange create - SaveDir *string // save mode: optional directory to stores files to - Silent bool // suppress message printing - - ConnName string // conn mode: name of connection - CloseReason string // conn mode: reason of close + PubExchange *string // pub: exchange to publish to + PubRoutingKey *string // pub: routing key, defaults to "" + Source *string // pub: file to send + Speed float64 // pub: speed factor + Delay *time.Duration // pub: fixed delay in ms + AutoAck bool // sub: auto ack enabled + QueueName string // queue create, remove, bind, sub + QueueBindingKey string // queue bind + ExchangeName string // exchange name create, remove or queue bind + ExchangeType string // exchange type create, remove or queue bind + ShowConsumers bool // info: also show consumer + InfoMode string // info: byExchange, byConnection + ShowStats bool // info: also show statistics + QueueFilter string // info: optional filter predicate + OmitEmptyExchanges bool // info: do not show exchanges wo/ bindings + ShowDefaultExchange bool // info: show default exchange + Format string // output format, depends on command + Durable bool // queue create, exchange create + Autodelete bool // queue create, exchange create + SaveDir *string // save: optional directory to stores files to + Silent bool // suppress message printing + ConnName string // conn: name of connection + CloseReason string // conn: reason of close } // getAmqpURI returns the ith entry of amqpURIs array or the value @@ -371,13 +385,30 @@ func parsePublishCmdArgs(args map[string]interface{}) (CommandLineArgs, error) { if result.AmqpURI, err = parseAmqpURI(args); err != nil { return result, err } - result.PubExchange = args["EXCHANGE"].(string) + if args["--exchange"] != nil { + exchange := args["--exchange"].(string) + result.PubExchange = &exchange + } if args["--routingkey"] != nil { - result.PubRoutingKey = args["--routingkey"].(string) + routingKey := args["--routingkey"].(string) + result.PubRoutingKey = &routingKey + } + if args["SOURCE"] != nil { + file := args["SOURCE"].(string) + result.Source = &file } - if args["FILE"] != nil { - file := args["FILE"].(string) - result.PubFile = &file + if args["--delay"] != nil { + delay, err := time.ParseDuration(args["--delay"].(string)) + if err != nil { + return result, err + } + result.Delay = &delay + } + if args["--speed"] != nil { + result.Speed, err = strconv.ParseFloat(args["--speed"].(string), 64) + if err != nil { + return result, err + } } return result, nil } diff --git a/cmd/rabtap/command_line_test.go b/cmd/rabtap/command_line_test.go index 1a34398..6a4ee07 100644 --- a/cmd/rabtap/command_line_test.go +++ b/cmd/rabtap/command_line_test.go @@ -7,6 +7,7 @@ package main import ( "os" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -257,16 +258,36 @@ func TestCliInfoCmdAllOptionsAreSet(t *testing.T) { assert.True(t, args.OmitEmptyExchanges) } -func TestCliPubCmdFromFile(t *testing.T) { +func TestCliPubCmdFromFileMinimalOptsSet(t *testing.T) { args, err := ParseCommandLineArgs( - []string{"pub", "--uri=broker", "exchange", "file", "--routingkey", "key"}) + []string{"pub", "--uri=broker"}) assert.Nil(t, err) assert.Equal(t, PubCmd, args.Cmd) assert.Equal(t, "broker", args.AmqpURI) - assert.Equal(t, "exchange", args.PubExchange) - assert.Equal(t, "file", *args.PubFile) - assert.Equal(t, "key", args.PubRoutingKey) + assert.Nil(t, args.PubExchange) + assert.Nil(t, args.Source) + assert.Nil(t, args.PubRoutingKey) + assert.Equal(t, "raw", args.Format) + assert.Nil(t, args.Delay) + assert.Equal(t, 1., args.Speed) + assert.False(t, args.Verbose) + assert.False(t, args.InsecureTLS) +} +func TestCliPubCmdFromFileAllOptsSet(t *testing.T) { + args, err := ParseCommandLineArgs( + []string{"pub", "--uri=broker", "--exchange=exchange", "file", + "--routingkey=key", "--delay=5s", "--format=json"}) + + assert.Nil(t, err) + assert.Equal(t, PubCmd, args.Cmd) + assert.Equal(t, "broker", args.AmqpURI) + assert.Equal(t, "exchange", *args.PubExchange) + assert.Equal(t, "file", *args.Source) + assert.Equal(t, "key", *args.PubRoutingKey) + assert.Equal(t, "json", args.Format) + assert.Equal(t, 5*time.Second, *args.Delay) + assert.Equal(t, 1., args.Speed) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) } @@ -276,50 +297,70 @@ func TestCliPubCmdUriFromEnv(t *testing.T) { os.Setenv(key, "URI") defer os.Unsetenv(key) args, err := ParseCommandLineArgs( - []string{"pub", "exchange"}) + []string{"pub"}) assert.Nil(t, err) assert.Equal(t, PubCmd, args.Cmd) assert.Equal(t, "URI", args.AmqpURI) - assert.Equal(t, "exchange", args.PubExchange) } -func TestCliPubCmdMissingUri(t *testing.T) { +func TestCliPubCmdMissingUriReturnsError(t *testing.T) { const key = "RABTAP_AMQPURI" os.Unsetenv(key) - _, err := ParseCommandLineArgs( - []string{"pub", "exchange"}) + _, err := ParseCommandLineArgs([]string{"pub"}) + assert.NotNil(t, err) +} + +func TestCliPubCmdInvalidDelayReturnsError(t *testing.T) { + _, err := ParseCommandLineArgs([]string{"pub", "--uri=uri", "--delay=invalid"}) + assert.NotNil(t, err) +} + +func TestCliPubCmdInvalidSpeedupReturnsError(t *testing.T) { + _, err := ParseCommandLineArgs([]string{"pub", "--uri=uri", "--speed=invalid"}) + assert.NotNil(t, err) +} + +func TestCliPubCmdInvalidFormatReturnsError(t *testing.T) { + _, err := ParseCommandLineArgs([]string{"pub", "--uri=uri", "--format=invalid"}) assert.NotNil(t, err) } func TestCliPubCmdFromStdinWithRoutingKeyJsonFormat(t *testing.T) { args, err := ParseCommandLineArgs( - []string{"pub", "--uri=broker1", "exchange1", "--routingkey=key", "--format=json"}) + []string{"pub", "--uri=broker1", "--exchange=exchange1", "--routingkey=key", "--format=json"}) assert.Nil(t, err) assert.Equal(t, PubCmd, args.Cmd) assert.Equal(t, "broker1", args.AmqpURI) - assert.Equal(t, "exchange1", args.PubExchange) - assert.Nil(t, args.PubFile) + assert.Equal(t, "exchange1", *args.PubExchange) + assert.Equal(t, "key", *args.PubRoutingKey) + assert.Nil(t, args.Source) assert.Equal(t, "json", args.Format) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) } -func TestCliPubCmdFromStdinWithRoutingKeyJsonFormatDeprecated(t *testing.T) { +func TestCliPubCmdFromStdinWithJsonFormatDeprecated(t *testing.T) { args, err := ParseCommandLineArgs( - []string{"pub", "--uri=broker1", "exchange1", "--routingkey=key", "--json"}) + []string{"pub", "--uri=broker1", "--json"}) assert.Nil(t, err) assert.Equal(t, PubCmd, args.Cmd) assert.Equal(t, "broker1", args.AmqpURI) - assert.Equal(t, "exchange1", args.PubExchange) - assert.Nil(t, args.PubFile) + assert.Nil(t, args.PubExchange) + assert.Nil(t, args.PubRoutingKey) + assert.Nil(t, args.Source) assert.Equal(t, "json", args.Format) assert.False(t, args.Verbose) assert.False(t, args.InsecureTLS) } +func TestCliSubCmdInvalidFormatReturnsError(t *testing.T) { + _, err := ParseCommandLineArgs([]string{"sub", "queue", "--uri=uri", "--format=invalid"}) + assert.NotNil(t, err) +} + func TestCliSubCmdSaveToDir(t *testing.T) { args, err := ParseCommandLineArgs( []string{"sub", "queuename", "--uri", "uri", "--saveto", "dir"}) diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index fc21105..5ed8564 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -6,9 +6,11 @@ import ( "context" "crypto/tls" "fmt" + "io/ioutil" "net/url" "os" "os/signal" + "sort" "time" //"net/http" @@ -41,10 +43,10 @@ func failOnError(err error, msg string, exitFunc func(int)) { } } -// defaultFilenameProvider returns the default filename to use when messages -// are saved to files during tap or subscribe. +// defaultFilenameProvider returns the default filename without extension to +// use when messages are saved to files during tap or subscribe. func defaultFilenameProvider() string { - return fmt.Sprintf("rabtap-%d.json", time.Now().UnixNano()) + return fmt.Sprintf("rabtap-%d", time.Now().UnixNano()) } func getTLSConfig(insecureTLS bool) *tls.Config { @@ -72,20 +74,53 @@ func startCmdInfo(args CommandLineArgs, title string) { out: NewColorableWriter(os.Stdout)}) } +// createMessageReaderForPublish returns a MessageReaderFunc that reads +// messages from the given source in the specified format. The source can +// be either empty (=stdin), a filename or a directory name +func createMessageReaderForPublishFunc(source *string, format string) (MessageReaderFunc, error) { + if source == nil { + return CreateMessageReaderFunc(format, os.Stdin) + } + + fi, err := os.Stat(*source) + if err != nil { + return nil, err + } + + if !fi.IsDir() { + file, err := os.Open(*source) + if err != nil { + return nil, err + } + // TODO close file + return CreateMessageReaderFunc(format, file) + } + + metadataFiles, err := LoadMetadataFilesFromDir(*source, ioutil.ReadDir, NewRabtapFileInfoPredicate()) + if err != nil { + return nil, err + } + + sort.SliceStable(metadataFiles, func(i, j int) bool { + return metadataFiles[i].metadata.XRabtapReceivedTimestamp.Before( + metadataFiles[j].metadata.XRabtapReceivedTimestamp) + }) + + return CreateMessageFromDirReaderFunc(format, metadataFiles) +} + func startCmdPublish(ctx context.Context, args CommandLineArgs) { - file := os.Stdin - if args.PubFile != nil { - var err error - file, err = os.Open(*args.PubFile) - failOnError(err, "error opening "+*args.PubFile, os.Exit) - defer file.Close() + if args.Format == "raw" && args.PubExchange == nil && args.PubRoutingKey == nil { + fmt.Fprint(os.Stderr, "Warning: using raw message format but neither exchange or routing key are set.\n") } - readerFunc, err := createMessageReaderFunc(args.Format, file) - failOnError(err, "options", os.Exit) + readerFunc, err := createMessageReaderForPublishFunc(args.Source, args.Format) + failOnError(err, "message-reader", os.Exit) err = cmdPublish(ctx, CmdPublishArg{ amqpURI: args.AmqpURI, exchange: args.PubExchange, routingKey: args.PubRoutingKey, + fixedDelay: args.Delay, + speed: args.Speed, tlsConfig: getTLSConfig(args.InsecureTLS), readerFunc: readerFunc}) failOnError(err, "error publishing message", os.Exit) diff --git a/cmd/rabtap/main_test.go b/cmd/rabtap/main_test.go index 313e9f7..394d6f5 100644 --- a/cmd/rabtap/main_test.go +++ b/cmd/rabtap/main_test.go @@ -19,6 +19,11 @@ func TestInitLogging(t *testing.T) { initLogging(false) } +func TestDefaultFilenameProviderReturnsFilenameInExpectedFormat(t *testing.T) { + fn := defaultFilenameProvider() + assert.Regexp(t, "^rabtap-[0-9]+$", fn) +} + func TestGetTLSConfig(t *testing.T) { tls := getTLSConfig(true) diff --git a/cmd/rabtap/message_reader_dir.go b/cmd/rabtap/message_reader_dir.go new file mode 100644 index 0000000..820368f --- /dev/null +++ b/cmd/rabtap/message_reader_dir.go @@ -0,0 +1,137 @@ +// read persisted metadata and messages from a directory +// Copyright (C) 2019 Jan Delgado + +package main + +import ( + "fmt" + "io/ioutil" + "os" + "path" + "regexp" + "strings" +) + +const metadataFilePattern = `^rabtap-[0-9]+.json$` + +type DirReader func(string) ([]os.FileInfo, error) +type FileInfoPredicate func(fileinfo os.FileInfo) bool + +type FilenameWithMetadata struct { + filename string + metadata RabtapPersistentMessage +} + +func filenameWithoutExtension(fn string) string { + return strings.TrimSuffix(fn, path.Ext(fn)) +} + +// newRabtapFileInfoPredicate returns a FileInfoPredicate that matches +// rabtap metadata files +func NewRabtapFileInfoPredicate() FileInfoPredicate { + filenameRe := regexp.MustCompile(metadataFilePattern) + return func(fi os.FileInfo) bool { + return fi.Mode().IsRegular() && filenameRe.MatchString(fi.Name()) + } +} + +func filterMetadataFilenames(fileinfos []os.FileInfo, pred FileInfoPredicate) []string { + var filenames []string + for _, fi := range fileinfos { + if pred(fi) { + filenames = append(filenames, fi.Name()) + } + } + return filenames +} + +// findMetadataFilenames returns list of absolute filenames looking like rabtap +// persisted message/metadata files +func findMetadataFilenames(dirname string, dirReader DirReader, pred FileInfoPredicate) ([]string, error) { + fileinfos, err := dirReader(dirname) + if err != nil { + return nil, err + } + return filterMetadataFilenames(fileinfos, pred), nil +} + +func readRabtapPersistentMessage(filename string) (RabtapPersistentMessage, error) { + file, err := os.Open(filename) + if err != nil { + return RabtapPersistentMessage{}, err + } + defer file.Close() + contents, err := readMessageFromJSON(file) + if err != nil { + return RabtapPersistentMessage{}, fmt.Errorf("error reading %s: %v", filename, err) + } + return contents, nil +} + +// readMetadataOfFiles reads all metadata files from the given list of files. +// returns an error if any error occurs. +func readMetadataOfFiles(dirname string, filenames []string) ([]FilenameWithMetadata, error) { + + data := make([]FilenameWithMetadata, len(filenames)) + for i, filename := range filenames { + fullpath := path.Join(dirname, filename) + msg, err := readRabtapPersistentMessage(fullpath) + if err != nil { + return data, err + } + // we will load the body later when the message is published (from the + // JSON or a separate message file). This approach reads message bodies + // twice, but this should not be a problem + msg.Body = []byte("") + data[i] = FilenameWithMetadata{filename: fullpath, metadata: msg} + } + + return data, nil +} + +// LoadMetadataFromDir loads all metadata files from the given directory +// passing the given predicate +func LoadMetadataFilesFromDir(dirname string, dirReader DirReader, pred FileInfoPredicate) ([]FilenameWithMetadata, error) { + filenames, err := findMetadataFilenames(dirname, dirReader, pred) + if err != nil { + return nil, err + } + return readMetadataOfFiles(dirname, filenames) +} + +// createMessageFromDirReaderFunc returns a MessageReaderFunc that reads +// messages from the given list of filenames in the given format. +func CreateMessageFromDirReaderFunc(format string, files []FilenameWithMetadata) (MessageReaderFunc, error) { + + curfile := 0 + + switch format { + case "json-nopp": + fallthrough + case "json": + return func() (RabtapPersistentMessage, bool, error) { + var message RabtapPersistentMessage + if curfile >= len(files) { + return message, false, nil + } + + message, err := readRabtapPersistentMessage(files[curfile].filename) + curfile++ + return message, curfile < len(files), err + }, nil + case "raw": + return func() (RabtapPersistentMessage, bool, error) { + var message RabtapPersistentMessage + if curfile >= len(files) { + return message, false, nil + } + rawFile := filenameWithoutExtension(files[curfile].filename) + ".dat" + body, err := ioutil.ReadFile(rawFile) + message = files[curfile].metadata + message.Body = body + curfile++ + return message, curfile < len(files), err + }, nil + } + return nil, fmt.Errorf("invaild format %s", format) +} diff --git a/cmd/rabtap/message_reader_dir_test.go b/cmd/rabtap/message_reader_dir_test.go new file mode 100644 index 0000000..5517e08 --- /dev/null +++ b/cmd/rabtap/message_reader_dir_test.go @@ -0,0 +1,283 @@ +// Copyright (C) 2019 Jan Delgado + +package main + +import ( + "errors" + "io/ioutil" + "os" + "path" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// a os.FileInfo implementation for tests +type fileInfoMock struct { + name string + mode os.FileMode +} + +func (s fileInfoMock) Name() string { + return s.name +} + +func (s fileInfoMock) Size() int64 { return 0 } + +func (s fileInfoMock) Mode() os.FileMode { + return s.mode +} + +func (s fileInfoMock) ModTime() time.Time { + return time.Now() +} + +func (s fileInfoMock) IsDir() bool { + return false +} +func (s fileInfoMock) Sys() interface{} { + return nil +} + +func newFileInfoMock(name string, mode os.FileMode) fileInfoMock { + return fileInfoMock{name, mode} +} + +func mockDirFiles() []os.FileInfo { + infos := []fileInfoMock{ + newFileInfoMock("somefile.txt", 0), + newFileInfoMock("rabtap-1234.json", 0), + newFileInfoMock("rabtap-9999.json", os.ModeDir), + newFileInfoMock("rabtap-1235.json", 0), + newFileInfoMock("xrabtap-1235.json", 0), + newFileInfoMock("rabtap-1235.invalid", 0), + } + result := make([]os.FileInfo, len(infos)) + for i, v := range infos { + result[i] = v + } + return result +} + +func mockDirReader(dirname string) ([]os.FileInfo, error) { + if dirname == "/rabtap" { + return mockDirFiles(), nil + } + return nil, errors.New("invalid directory") +} + +// writeTempFile creates a temporary file with data as it's content. The +// filename is returned. +func writeTempFile(t *testing.T, data string) string { + tmpFile, err := ioutil.TempFile(os.TempDir(), "") + require.Nil(t, err) + + defer tmpFile.Close() + + _, err = tmpFile.Write([]byte(data)) + require.Nil(t, err) + + return tmpFile.Name() +} + +func TestFilenameWithoutExtensionReturnsExpectedResult(t *testing.T) { + assert.Equal(t, "/some/file", filenameWithoutExtension("/some/file.ext")) + assert.Equal(t, "/some/file", filenameWithoutExtension("/some/file")) +} + +func TestRabtapFileInfoPredicateFiltersExpectedFiles(t *testing.T) { + p := NewRabtapFileInfoPredicate() + + assert.True(t, p(newFileInfoMock("rabtap-1234.json", 0))) + assert.True(t, p(newFileInfoMock("rabtap-1235.json", 0))) + + assert.False(t, p(newFileInfoMock("somefile.txt", 0))) + assert.False(t, p(newFileInfoMock("rabtap-9999.jsonx", 0))) + assert.False(t, p(newFileInfoMock("rabtap-9999.json", os.ModeDir))) +} + +func TestFilterMetadataFilesAppliesFilterCorretly(t *testing.T) { + pred := func(fi os.FileInfo) bool { + return fi.Name() == "rabtap-1234.json" + } + files := filterMetadataFilenames(mockDirFiles(), pred) + + assert.Equal(t, 1, len(files)) + assert.Equal(t, "rabtap-1234.json", files[0]) +} + +func TestFindMetadataFilenamesFindsAllRabtapMetadataFiles(t *testing.T) { + pred := func(fi os.FileInfo) bool { + return fi.Name() == "rabtap-1234.json" + } + filenames, err := findMetadataFilenames("/rabtap", mockDirReader, pred) + assert.Nil(t, err) + assert.Equal(t, []string{"rabtap-1234.json"}, filenames) +} + +func TestFindMetadataFilenamesReturnsErrorOnInvalidDirectory(t *testing.T) { + pred := func(fi os.FileInfo) bool { + return true + } + _, err := findMetadataFilenames("/invalid", mockDirReader, pred) + assert.NotNil(t, err) +} + +func TestReadMetadataFileReturnsErrorForNonExistingFile(t *testing.T) { + _, err := readRabtapPersistentMessage("/this/file/should/not/exist") + assert.NotNil(t, err) +} + +func TestLoadMetadataFilesFromDirReturnsExpectedMetadata(t *testing.T) { + pred := func(fi os.FileInfo) bool { + return fi.Name() == "rabtap.json" + } + msg := `{ "Exchange": "exchange", "Body": "" }` + + dir, err := ioutil.TempDir("", "") + require.Nil(t, err) + defer os.RemoveAll(dir) + + metadataFile := filepath.Join(dir, "rabtap.json") + messageFile := filepath.Join(dir, "rabtap.dat") + + err = ioutil.WriteFile(metadataFile, []byte(msg), 0666) + require.Nil(t, err) + err = ioutil.WriteFile(messageFile, []byte("Hello123"), 0666) + require.Nil(t, err) + + metadata, err := LoadMetadataFilesFromDir(dir, ioutil.ReadDir, pred) + assert.Nil(t, err) + assert.Equal(t, 1, len(metadata)) + assert.Equal(t, path.Join(dir, "rabtap.json"), metadata[0].filename) + assert.Equal(t, "exchange", metadata[0].metadata.Exchange) +} + +func TestLoadMetadataFilesFromDirFailsOnInvalidDir(t *testing.T) { + pred := func(fi os.FileInfo) bool { + return true + } + dirReader := func(string) ([]os.FileInfo, error) { + return nil, errors.New("invalid dir") + } + _, err := LoadMetadataFilesFromDir("unused", dirReader, pred) + assert.NotNil(t, err) +} + +func TestReadPersistentRabtapMessageReturnsCorrectObject(t *testing.T) { + msg := `{ + "ContentType": "", + "ContentEncoding": "", + "DeliveryMode": 0, + "Priority": 0, + "CorrelationID": "", + "ReplyTo": "", + "Expiration": "", + "MessageID": "", + "Timestamp": "0001-01-01T00:00:00Z", + "Type": "", + "UserID": "", + "AppID": "", + "DeliveryTag": 1, + "Redelivered": false, + "Exchange": "amq.fanout", + "RoutingKey": "key", + "XRabtapReceivedTimestamp": "2019-12-29T21:51:33.52288901+01:00", + "Body": "SGVsbG8=" +} +` + filename := writeTempFile(t, msg) + defer os.Remove(filename) + + metadata, err := readRabtapPersistentMessage(filename) + + assert.Nil(t, err) + assert.Equal(t, "amq.fanout", metadata.Exchange) + assert.Equal(t, "key", metadata.RoutingKey) + expectedTs, _ := time.Parse(time.RFC3339Nano, "2019-12-29T21:51:33.52288901+01:00") + assert.Equal(t, expectedTs, metadata.XRabtapReceivedTimestamp) + // base64dec("SGVsbG=") == "Hello" + assert.Equal(t, []byte("Hello"), metadata.Body) + // etc +} + +func TestReadMetadataOfFilesFailsWithErrorIfAnyFileCouldNotBeRead(t *testing.T) { + + _, err := readMetadataOfFiles("/base", []string{"/this/file/should/not/exist"}) + assert.NotNil(t, err) +} + +func TestReadMetadataOfFilesReturnsExpectedMetadata(t *testing.T) { + msg := `{ + "ContentType": "", + "ContentEncoding": "", + "DeliveryMode": 0, + "Priority": 0, + "CorrelationID": "", + "ReplyTo": "", + "Expiration": "", + "MessageID": "", + "Timestamp": "0001-01-01T00:00:00Z", + "Type": "", + "UserID": "", + "AppID": "", + "DeliveryTag": 1, + "Redelivered": false, + "Exchange": "amq.fanout", + "RoutingKey": "key", + "XRabtapReceivedTimestamp": "2019-12-29T21:51:33.52288901+01:00", + "Body": "SGVsbG8=" +} +` + dir, filename := path.Split(writeTempFile(t, msg)) + defer os.Remove(filename) + + data, err := readMetadataOfFiles(dir, []string{filename}) + + assert.Nil(t, err) + assert.Equal(t, 1, len(data)) + assert.Equal(t, path.Join(dir, filename), data[0].filename) + assert.Equal(t, "amq.fanout", data[0].metadata.Exchange) + assert.Equal(t, "key", data[0].metadata.RoutingKey) + expectedTs, _ := time.Parse(time.RFC3339Nano, "2019-12-29T21:51:33.52288901+01:00") + assert.Equal(t, expectedTs, data[0].metadata.XRabtapReceivedTimestamp) + assert.Equal(t, []byte(""), data[0].metadata.Body) + // etc +} + +func TestCreateMessageFromDirReaderFuncReturnsErrorForUnknownFormat(t *testing.T) { + _, err := CreateMessageFromDirReaderFunc("invalid", []FilenameWithMetadata{}) + assert.NotNil(t, err) +} + +func TestCreateMessageFromDirReaderFuncReturnsCorrectReaderForJSONFormat(t *testing.T) { + // TODO complete test + + formats := []string{"json", "json-nopp"} + for _, format := range formats { + reader, err := CreateMessageFromDirReaderFunc(format, []FilenameWithMetadata{}) + assert.Nil(t, err) + assert.NotNil(t, reader) + + // TODO test the reader func with actual data + _, more, err := reader() + assert.False(t, more) + assert.Nil(t, err) + } +} + +func TestCreateMessageFromDirReaderFuncReturnsCorrectReaderForRawFormat(t *testing.T) { + // TODO complete test + + reader, err := CreateMessageFromDirReaderFunc("raw", []FilenameWithMetadata{}) + assert.Nil(t, err) + assert.NotNil(t, reader) + + // TODO test the reader func with actual data + _, more, err := reader() + assert.False(t, more) + assert.Nil(t, err) +} diff --git a/cmd/rabtap/message_reader_file.go b/cmd/rabtap/message_reader_file.go new file mode 100644 index 0000000..6a1448c --- /dev/null +++ b/cmd/rabtap/message_reader_file.go @@ -0,0 +1,71 @@ +// read persisted messages from files +// Copyright (C) 2019 Jan Delgado + +package main + +import ( + "encoding/json" + "fmt" + "io" + "io/ioutil" +) + +// MessageReaderFunc provides messages that can be sent to an exchange. +// returns the message to be published, a flag if more messages are to be read, +// and an error. +type MessageReaderFunc func() (RabtapPersistentMessage, bool, error) + +// readMessageFromRawFile reads a single messages from the given io.Reader +// which is typically stdin or a file. If reading from stdin, CTRL+D (linux) +// or CTRL+Z (Win) on an empty line terminates the reader. +// -> readRawMessage +func readMessageFromRawFile(reader io.Reader) ([]byte, error) { + return ioutil.ReadAll(reader) + //return amqp.Publishing{Body: buf}, false, err +} + +// -> readJSONMessage +func readMessageFromJSON(reader io.Reader) (RabtapPersistentMessage, error) { + var message RabtapPersistentMessage + + contents, err := ioutil.ReadAll(reader) + if err != nil { + return message, err + } + err = json.Unmarshal(contents, &message) + + return message, err +} + +// readMessageFromJSONStream reads JSON messages from the given decoder as long +// as there are messages available. +// -> readStreamedJSONMessage +func readMessageFromJSONStream(decoder *json.Decoder) (RabtapPersistentMessage, bool, error) { + var message RabtapPersistentMessage + err := decoder.Decode(&message) + if err != nil { + return message, false, err + } + return message, true, nil +} + +// CreateMessageReaderFunc returns a MessageReaderFunc that reads messages from +// the the given reader in the provided format +func CreateMessageReaderFunc(format string, reader io.ReadCloser) (MessageReaderFunc, error) { + switch format { + case "json-nopp": + fallthrough + case "json": + decoder := json.NewDecoder(reader) + return func() (RabtapPersistentMessage, bool, error) { + msg, more, err := readMessageFromJSONStream(decoder) + return msg, more, err + }, nil + case "raw": + return func() (RabtapPersistentMessage, bool, error) { + buf, err := readMessageFromRawFile(reader) + return RabtapPersistentMessage{Body: buf}, false, err + }, nil + } + return nil, fmt.Errorf("invaild format %s", format) +} diff --git a/cmd/rabtap/message_reader_file_test.go b/cmd/rabtap/message_reader_file_test.go new file mode 100644 index 0000000..bc0cd57 --- /dev/null +++ b/cmd/rabtap/message_reader_file_test.go @@ -0,0 +1,137 @@ +package main + +import ( + "bytes" + "encoding/json" + "io" + "io/ioutil" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadMessageFromRawFile(t *testing.T) { + reader := bytes.NewReader([]byte("hello")) + + buf, err := readMessageFromRawFile(reader) + assert.Nil(t, err) + assert.Equal(t, []byte("hello"), buf) +} + +func TestReadMessageFromJSON(t *testing.T) { + // note: base64dec("aGVsbG8=") == "hello" + data := ` + { + "Headers": null, + "ContentType": "text/plain", + "ContentEncoding": "", + "DeliveryMode": 0, + "Priority": 0, + "CorrelationID": "", + "ReplyTo": "", + "Expiration": "", + "MessageID": "", + "Timestamp": "2017-10-28T23:45:33+02:00", + "Type": "", + "UserID": "", + "AppID": "rabtap.testgen", + "DeliveryTag": 63, + "Redelivered": false, + "Exchange": "amq.topic", + "RoutingKey": "test-q-amq.topic-0", + "Body": "aGVsbG8=" + }` + reader := bytes.NewReader([]byte(data)) + + msg, err := readMessageFromJSON(reader) + assert.Nil(t, err) + assert.Equal(t, []byte("hello"), msg.Body) + assert.Equal(t, "amq.topic", msg.Exchange) + // TODO test additional attributes +} + +func TestReadMessageFromJSONStreamReturnsOneMessagePerCall(t *testing.T) { + // note: base64dec("aGVsbG8=") == "hello" + // base64dec("c2Vjb25kCg==") == "second\n" + data := ` + { + "Headers": null, + "ContentType": "text/plain", + "ContentEncoding": "", + "DeliveryMode": 0, + "Priority": 0, + "CorrelationID": "", + "ReplyTo": "", + "Expiration": "", + "MessageID": "", + "Timestamp": "2017-10-28T23:45:33+02:00", + "Type": "", + "UserID": "", + "AppID": "rabtap.testgen", + "DeliveryTag": 63, + "Redelivered": false, + "Exchange": "amq.topic", + "RoutingKey": "test-q-amq.topic-0", + "Body": "aGVsbG8=" + } + { + "Body": "c2Vjb25kCg==" + }` + //reader := ioutil.NopCloser(bytes.NewReader([]byte("hello world"))) // r type is io.ReadCloser + reader := bytes.NewReader([]byte(data)) + decoder := json.NewDecoder(reader) + + msg, more, err := readMessageFromJSONStream(decoder) + assert.Nil(t, err) + assert.True(t, more) + assert.Equal(t, []byte("hello"), msg.Body) + assert.Equal(t, "amq.topic", msg.Exchange) + // TODO test additional attributes + + msg, more, err = readMessageFromJSONStream(decoder) + assert.Nil(t, err) + assert.True(t, more) + assert.Equal(t, []byte("second\n"), msg.Body) + + msg, more, err = readMessageFromJSONStream(decoder) + assert.Equal(t, io.EOF, err) + assert.False(t, more) +} + +func TestCreateMessageReaderFuncReturnsErrorForUnknownFormat(t *testing.T) { + reader := ioutil.NopCloser(bytes.NewReader([]byte(""))) + _, err := CreateMessageReaderFunc("invalid", reader) + assert.NotNil(t, err) +} + +func TestCreateMessageReaderFuncReturnsJSONReaderForJSONFormats(t *testing.T) { + + for _, format := range []string{"json", "json-nopp"} { + reader := ioutil.NopCloser(bytes.NewReader([]byte(`{"Body": "aGVsbG8="}`))) + + readFunc, err := CreateMessageReaderFunc(format, reader) + assert.Nil(t, err) + + msg, more, err := readFunc() + assert.Nil(t, err) + assert.True(t, more) + assert.Equal(t, []byte("hello"), msg.Body) + + msg, more, err = readFunc() + assert.Equal(t, err, io.EOF) + assert.False(t, more) + } +} + +func TestCreateMessageReaderFuncReturnsRawFileReaderForRawFormats(t *testing.T) { + + reader := ioutil.NopCloser(bytes.NewReader([]byte("hello"))) + + readFunc, err := CreateMessageReaderFunc("raw", reader) + assert.Nil(t, err) + + msg, more, err := readFunc() + assert.Nil(t, err) + assert.False(t, more) + assert.Equal(t, []byte("hello"), msg.Body) +} diff --git a/cmd/rabtap/message_writer.go b/cmd/rabtap/message_writer.go index 8b36b0a..19342cb 100644 --- a/cmd/rabtap/message_writer.go +++ b/cmd/rabtap/message_writer.go @@ -15,7 +15,7 @@ import ( "github.com/streadway/amqp" ) -// RabtapPersistentMessage is a messages as persistet from/to a JSON file +// RabtapPersistentMessage is a messages as persisted from/to a JSON file // object can be initialiazed from amqp.Delivery and to amqp.Publishing type RabtapPersistentMessage struct { Headers map[string]interface{} `json:",omitempty"` diff --git a/go.mod b/go.mod index b622fdb..47bf2f0 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,7 @@ require ( github.com/sirupsen/logrus v1.3.0 github.com/streadway/amqp v0.0.0-20190225234609-30f8ed68076e github.com/stretchr/testify v1.3.0 - golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b // indirect - golang.org/x/net v0.0.0-20190226215741-afe646ca25a4 // indirect golang.org/x/sync v0.0.0-20190423024810-112230192c58 - golang.org/x/sys v0.0.0-20190226215855-775f8194d0f9 // indirect + golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 // indirect gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) diff --git a/go.sum b/go.sum index 8a11c5f..7041d80 100644 --- a/go.sum +++ b/go.sum @@ -32,14 +32,27 @@ golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/net v0.0.0-20190226215741-afe646ca25a4 h1:v9iNk7wDkdbc0AEntnZ+bru4iRMZ4+vI3W1knqDZijs= golang.org/x/net v0.0.0-20190226215741-afe646ca25a4/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190226215855-775f8194d0f9 h1:N26gncmS+iqc/W/SKhX3ElI5pkt72XYoRLgi5Z70LSc= golang.org/x/sys v0.0.0-20190226215855-775f8194d0f9/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114 h1:DnSr2mCsxyCE6ZgIkmcWUQY2R5cH/6wL7eIxEmQOMSE= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= diff --git a/pkg/amqp_connector.go b/pkg/amqp_connector.go index 289f3c9..7ccd57e 100644 --- a/pkg/amqp_connector.go +++ b/pkg/amqp_connector.go @@ -50,20 +50,19 @@ func NewAmqpConnector(uri string, tlsConfig *tls.Config, logger logrus.StdLogger // Connect (re-)establishes the connection to RabbitMQ broker. func (s *AmqpConnector) Connect(ctx context.Context, worker AmqpWorkerFunc) error { - sessions := redial(ctx, s.uri, s.tlsConfig, s.logger) + sessions := redial(ctx, s.uri, s.tlsConfig, s.logger, FailEarly) for session := range sessions { s.logger.Printf("waiting for new session ...") sub, more := <-session if !more { - // closed + // closed. TODO propagate errors from redial() return errors.New("initial connection failed") } - s.logger.Printf("got new session ...") + s.logger.Printf("got new amqp session ...") action, err := worker(ctx, sub) if !action.shouldReconnect() { return err } } - s.logger.Print("amqp_connector.Connect exiting") return nil } diff --git a/pkg/amqp_message_loop.go b/pkg/amqp_message_loop.go index 77bd623..ec88904 100644 --- a/pkg/amqp_message_loop.go +++ b/pkg/amqp_message_loop.go @@ -32,9 +32,9 @@ func amqpMessageLoop(ctx context.Context, // Avoid blocking write to out when e.g. on the other end of the // channel the user pressed Ctrl+S to stop console output select { - case out <- NewTapMessage(&amqpMessage, received): case <-ctx.Done(): return doNotReconnect + case out <- NewTapMessage(&amqpMessage, received): } case <-ctx.Done(): diff --git a/pkg/amqp_message_loop_test.go b/pkg/amqp_message_loop_test.go index 5e32da7..5abbe57 100644 --- a/pkg/amqp_message_loop_test.go +++ b/pkg/amqp_message_loop_test.go @@ -9,7 +9,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestAmqpMessageLoopForwardsMessage(t *testing.T) { +func TestAmqpMessageLoopPanicsWithInvalidMessage(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) out := make(TapChannel) in := make(chan interface{}) @@ -67,6 +67,7 @@ func TestAmqpMessageLoopCancelBlockingWrite(t *testing.T) { // this second write blocks the write in the messageloop in <- amqp.Delivery{} + time.Sleep(1 * time.Second) cancel() select { diff --git a/pkg/session.go b/pkg/session.go index 79ab4b1..7e9cd25 100644 --- a/pkg/session.go +++ b/pkg/session.go @@ -13,6 +13,7 @@ import ( const ( retryDelay = 3 * time.Second + FailEarly = true ) // Session composes an amqp.Connection with an amqp.Channel @@ -36,12 +37,11 @@ func (s *Session) NewChannel() error { // channel in a Session struct. Closes returned chan when initial connection // attempt fails. func redial(ctx context.Context, url string, tlsConfig *tls.Config, - logger logrus.StdLogger) chan chan Session { + logger logrus.StdLogger, failEarly bool) chan chan Session { sessions := make(chan chan Session) go func() { - initial := true sess := make(chan Session) defer close(sessions) @@ -68,7 +68,7 @@ func redial(ctx context.Context, url string, tlsConfig *tls.Config, } } logger.Printf("session: cannot (re-)dial: %v: %q", err, url) - if initial { + if failEarly { close(sess) return } @@ -81,7 +81,7 @@ func redial(ctx context.Context, url string, tlsConfig *tls.Config, } } - initial = false + failEarly = false select { case sess <- Session{conn, ch}: diff --git a/pkg/session_test.go b/pkg/session_test.go index dcb34a4..04a8096 100644 --- a/pkg/session_test.go +++ b/pkg/session_test.go @@ -10,6 +10,7 @@ import ( "log" "os" "testing" + "time" "github.com/jandelgado/rabtap/pkg/testcommon" "github.com/stretchr/testify/assert" @@ -21,7 +22,7 @@ func TestSessionProvidesConnectionAndChannel(t *testing.T) { defer cancel() log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile) - sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log) + sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log, FailEarly) sessionFactory := <-sessions session := <-sessionFactory @@ -35,22 +36,43 @@ func TestSessionShutsDownProperlyWhenCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile) - sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log) + sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log, FailEarly) sessionFactory, more := <-sessions assert.True(t, more) + time.Sleep(1 * time.Second) cancel() + time.Sleep(1 * time.Second) + <-sessionFactory _, more = <-sessions assert.False(t, more) } +func TestSessionCanBeCancelledWhenSessionIsNotReadFromChannel(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + + log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile) + sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log, FailEarly) + + sessionFactory, more := <-sessions + assert.True(t, more) + <-sessionFactory + + time.Sleep(1 * time.Second) + cancel() + time.Sleep(1 * time.Second) + + _, more = <-sessions + assert.False(t, more) +} + func TestSessionFailsEarlyWhenNoConnectionIsPossible(t *testing.T) { ctx := context.Background() log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile) - sessions := redial(ctx, "amqp://localhost:1", &tls.Config{}, log) + sessions := redial(ctx, "amqp://localhost:1", &tls.Config{}, log, FailEarly) sessionFactory, more := <-sessions assert.True(t, more) @@ -62,12 +84,30 @@ func TestSessionFailsEarlyWhenNoConnectionIsPossible(t *testing.T) { assert.False(t, more) } +func TestSessionCanBeCancelledDuringRetryDelay(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + + log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile) + sessions := redial(ctx, "amqp://localhost:1", &tls.Config{}, log, !FailEarly) + + sessionFactory, more := <-sessions + assert.True(t, more) + + time.Sleep(1 * time.Second) + cancel() + time.Sleep(1 * time.Second) + + _, more = <-sessionFactory + assert.False(t, more) +} + func TestSessionNewChannelReturnsNewChannel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() log := log.New(os.Stdout, "session_inttest: ", log.Lshortfile) - sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log) + sessions := redial(ctx, testcommon.IntegrationURIFromEnv(), &tls.Config{}, log, FailEarly) sessionFactory := <-sessions session := <-sessionFactory