Skip to content

Commit

Permalink
replay messages as recorded (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado authored Jan 28, 2020
1 parent 0161500 commit 1fdeb0d
Show file tree
Hide file tree
Showing 28 changed files with 1,370 additions and 237 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@

# Changelog for rabtap

## v1.22 (2020-01-28)

* The `pub` command now allows ialso to replay messages from a direcory previously
recorded. The pub command also honors the recorded timestamps and delays the
messages during replay. The signature of of the `pub` command was changed
(see README.md). Note that the exchange is now optional and will be taken
from the message metadata that is published.

## v1.21 (2019-12-14)

* new option: `--format FORMAT` which controls output format in `tap`,
Expand Down
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ build-all: build build-mac build-win

build-mac:
cd cmd/rabtap && GO111MODULE=on CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags \
"-X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_DARWIN64)
"-s -w -X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_DARWIN64)

build-linux:
cd cmd/rabtap && GO111MODULE=on CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags \
"-X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_LINUX64)
"-s -w -X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_LINUX64)

build-win:
cd cmd/rabtap && GO111MODULE=on CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags \
"-X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_WIN64)
"-s -w -X main.RabtapAppVersion=$(VERSION)" -o ../../$(BINARY_WIN64)

tags: $(SOURCE)
@gotags -f tags $(SOURCE)
Expand Down Expand Up @@ -58,8 +58,7 @@ toxiproxy-cmd:

# run rabbitmq server for integration test using docker container.
run-broker:
sudo docker run -ti --rm -p 5672:5672 \
-p 15672:15672 rabbitmq:3-management
podman run -ti --rm -p 5672:5672 -p 15672:15672 rabbitmq:3-management

dist-clean: clean
rm -f *.out $(BINARY_WIN64) $(BINARY_LINUX64) $(BINARY_DARWIN64)
Expand Down
102 changes: 68 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ Usage:
rabtap tap EXCHANGES [--uri=URI] [--saveto=DIR] [--format=FORMAT] [-jknsv]
rabtap (tap --uri=URI EXCHANGES)... [--saveto=DIR] [--format=FORMAT] [-jknsv]
rabtap sub QUEUE [--uri URI] [--saveto=DIR] [--format=FORMAT] [--no-auto-ack] [-jksvn]
rabtap pub [--uri=URI] EXCHANGE [FILE] [--routingkey=KEY] [--format=FORMAT] [-jkv]
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT]
[--delay=DELAY | --speed=FACTOR] [-jkv]
rabtap exchange create EXCHANGE [--uri=URI] [--type=TYPE] [-adkv]
rabtap exchange rm EXCHANGE [--uri=URI] [-kv]
rabtap queue create QUEUE [--uri=URI] [-adkv]
Expand All @@ -141,26 +142,32 @@ Usage:
rabtap conn close CONNECTION [--api=APIURI] [--reason=REASON] [-kv]
rabtap --version
Options:
Arguments and options:
EXCHANGES comma-separated list of exchanges and binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
FILE file to publish in pub mode. If omitted, stdin will be read.
SOURCE file or directory to publish in pub mode. If omitted, stdin will be read.
QUEUE name of a queue.
CONNECTION name of a connection.
DIR directory to read messages from.
-a, --autodelete create auto delete exchange/queue.
--api=APIURI connect to given API server. If APIURI is omitted,
the environment variable RABTAP_APIURI will be used.
-b, --bindingkey=KEY binding key to use in bind queue command.
--by-connection output of info command starts with connections.
--consumers include consumers and connections in output of info command.
--delay=DELAY Time to wait between sending messages during publish.
If not set then messages will be delayed as recorded.
The value must be suffixed with a time unit, e.g. ms, s etc.
-d, --durable create durable exchange/queue.
--exchange=EXCHANGE Optional exchange to publish to. If omitted, exchange will
be taken from message being published (see JSON message format).
--filter=EXPR Predicate for info command to filter queues [default: true]
--format=FORMAT * for tap, pub, sub command: format to write/read messages to console
and optionally to file (when --saveto DIR is given).
Valid options are: "raw", "json", "json-nopp". Default: raw
and optionally to file (when --saveto DIR is given).
Valid options are: "raw", "json", "json-nopp". Default: raw
* for info command: controls generated output format. Valid
options are: "text", "dot". Default: text
options are: "text", "dot". Default: text
-h, --help print this help.
-j, --json Deprecated. Use "--format json" instead.
-k, --insecure allow insecure TLS connections (no certificate check).
Expand All @@ -172,10 +179,13 @@ Options:
when the channel is closed.
--omit-empty don't show echanges without bindings in info command.
--reason=REASON reason why the connection was closed [default: closed by rabtap].
-r, --routingkey=KEY routing key to use in publish mode.
-r, --routingkey=KEY routing key to use in publish mode. If omitted, routing key
will be taken from message being published (see JSON
message format).
--saveto=DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
-s, --silent suppress message output to stdout.
--speed=FACTOR Speed factor to use during publish [default: 1.0].
--stats include statistics in output of info command.
-t, --type=TYPE exchange type [default: fanout].
--uri=URI connect to given AQMP broker. If omitted, the
Expand All @@ -200,7 +210,7 @@ Examples:
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
rabtap info
rabtap info --filter "binding.Source == 'amq.topic'" -o
rabtap info --filter "binding.Source == 'amq.topic'" --omit-empty
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
```

Expand All @@ -213,7 +223,7 @@ Rabtap understands the following commands:
binding). Simulatanous
* `sub` - subscribes to a queue and consumes messages sent to the queue (acts
like a RabbitMQ consumer)
* `pub` - send messages to an exchange.
* `pub` - publish messages to an exchange, optionally with the timing as recorded.
* `info` - show broker related info (exchanges, queues, bindings, stats). The
features of an exchange are displayed in square brackets with `D` (durable),
`AD` (auto delete) and `I` (internal). The features of a queue are displayed
Expand Down Expand Up @@ -402,17 +412,34 @@ Example assumes that `RABTAP_AMQPURI` environment variable is set, as the
#### Publish messages

The `pub` command is used to publish messages to an exchange with a routing
key. Messages can be published either in raw format, in which they are send
as-is, or in [JSON-format, as described here](#json-message-format), which
includes message metadata and the body in a single JSON document.

* `$ echo hello | rabtap pub amq.fanout` - publish "hello" to
exchange amqp.fanout
* `$ rabtap pub amq.direct -r routingKey message.json --format json` - publish
message(s) in JSON format to exchange `amq.direct` with routing key
`routingKey`.
* `$ rabtap pub amq.direct -r routingKey --json < message.json` - same
as above, but read message(s) from stdin.
key. The messages to be published are either read from a file, or from a
directory which contains previously recorded messages (e.g. using the
`--saveto` option of the `tap` command). Messages can be published either in
raw format, in which they are send as-is, or in [JSON-format, as described
here](#json-message-format), which includes message metadata and the body in a
single JSON document.

The general form of the `pub` command is
```
rabtap pub [--uri=URI] [SOURCE] [--exchange=EXCHANGE] [--routingkey=KEY] [--format=FORMAT]
[--delay=DELAY | --speed=FACTOR] [-jkv]
```

* `$ echo hello | rabtap pub amq.fanout` - publish "hello" to exchange amqp.fanout
* `$ rabtap pub messages.json --format=json` - messages are read from file `messages.json`
in [raptab JSON format](#json-message-format). Target exchange and routing
keys are read from the messages meta data. The `messages.json` file can
contain multiple JSON documents as it is treated as a JSON stream. Rabtap
will honor the `XRabtapReceived` timestamps of the messages and by default
delay the messages as they were recorded. This behaviour can be overridden
by the `--delay` and `--speed` options.
* `$ rabtap pub amq.direct -r myKey --format=json messages.json --delay=0s` - as
before, but publish messages always to exchange `amq.direct` with routing key
`myKey` and without any delays.
* `$ rabtap pub amq.direct -r myKey --format=raw somedir --delay=0s` - as
before, but assuming that `somedir` is a directory, the messages are read
from message files previously recorded to this directory and replayed in the
order they were recorded.

#### Poor mans shovel

Expand Down Expand Up @@ -523,7 +550,6 @@ messages in the following format:
}
...
```

Note that in JSON mode, the `Body` is base64 encoded.

## Filtering output of info command
Expand Down Expand Up @@ -563,14 +589,14 @@ available in the expression as variables:
The examples assume that `RABTAP_APIURI` environment variable points to the
broker to be used, e.g. `http://guest:guest@localhost:15672/api`).

* `rabtap info --filter "exchange.Name == 'amq.direct'" --omit-empty`: print
* `rabtap info --filter "exchange.Name == 'amq.direct'" --omit-empty` - print
only queues bound to exchange `amq.direct` and skip all empty exchanges.
* `rabtap info --filter "queue.Name =~ '.*test.*'" --omit-empty`: print all
* `rabtap info --filter "queue.Name =~ '.*test.*'" --omit-empty` - print all
queues with `test` in their name.
* `rabtap info --filter "queue.Name =~ '.*test.*' && exchange.Type == 'topic'" --omit-empty`: like
* `rabtap info --filter "queue.Name =~ '.*test.*' && exchange.Type == 'topic'" --omit-empty` - like
before, but consider only exchanges of type `topic`.
* `rabtap info --filter "queue.Consumers > 0" --omit --stats --consumers`: print all queues with at
one consumer
* `rabtap info --filter "queue.Consumers > 0" --omit --stats --consumers` - print
all queues with at least one consumer

### Type reference

Expand All @@ -580,6 +606,9 @@ transformed to golang types.

#### Exchange type

<details>
<summary>Definition of the Exchange type</summary>

```go
type Exchange struct {
Name string
Expand All @@ -600,9 +629,13 @@ type Exchange struct {
}
}
```
</details>

#### Queue type

<details>
<summary>Definition of the Queue type</summary>

```go
type Queue struct {
MessagesDetails struct {
Expand Down Expand Up @@ -664,9 +697,13 @@ type Queue struct {
Memory int
}
```
</details>

#### Binding type

<details>
<summary>Definition of the Binding type</summary>

```go
type Binding struct {
Source string
Expand All @@ -678,25 +715,22 @@ type Binding struct {
}
```

</details>

## Build from source

### Download and build using go get

```
$ GO111MODULE=on go get github.com/jandelgado/rabtap/cmd/rabtap
```

### Build using Makefile and tests

To build rabtap from source, you need [go](https://golang.org/) (version >= 12)
and the following tools installed:

* [ineffassign](https://github.com/gordonklaus/ineffassign)
* [misspell](https://github.com/client9/misspell/cmd/misspell)
* [golint](https://github.com/golang/lint)
* [gocyclo](https://github.com/fzipp/gocyclo)
To build rabtap from source, you need [go](https://golang.org/) (version >= 1.12)
and [golangci-lint](https://github.com/golangci/golangci-lint) installed.

```
$ export GO111MODULE=on
$ git clone https://github.com/jandelgado/rabtap && cd rabtap
$ make test -or- make short-test
$ make
Expand Down
46 changes: 45 additions & 1 deletion cmd/rabtap/cmd_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,48 @@

package main

// TODO
import (
"crypto/tls"
"net/url"
"os"
"testing"
"time"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/jandelgado/rabtap/pkg/testcommon"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// exchangeExists queries the API to check if a given exchange exists
func exchangeExists(t *testing.T, apiURL *url.URL, exchange string) bool {
// TODO add a simple client to testcommon
client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{})
exchanges, err := client.Exchanges()
require.Nil(t, err)
return rabtap.FindExchangeByName(exchanges, "/", exchange) != -1
}

func TestIntegrationCmdExchangeCreateRemoveExchange(t *testing.T) {
// integration tests creation and removal of exchange
oldArgs := os.Args
defer func() { os.Args = oldArgs }()

const testExchange = "cmd-exchange-test"

amqpURI := testcommon.IntegrationURIFromEnv()
apiURL, _ := url.Parse(testcommon.IntegrationAPIURIFromEnv())

assert.False(t, exchangeExists(t, apiURL, testExchange))
os.Args = []string{"rabtap", "exchange", "create", testExchange, "--uri", amqpURI}
main()
time.Sleep(2 * time.Second)
assert.True(t, exchangeExists(t, apiURL, testExchange))

// TODO validation

os.Args = []string{"rabtap", "exchange", "rm", testExchange, "--uri", amqpURI}
main()
time.Sleep(2 * time.Second)
assert.False(t, exchangeExists(t, apiURL, testExchange))
}
Loading

0 comments on commit 1fdeb0d

Please sign in to comment.