Skip to content

Commit

Permalink
new command 'conn' to perform connection relation actions added.
Browse files Browse the repository at this point in the history
Currently server side closing of connections is supported. Use
`rabtap info --consumer` to find the name of the connection to
close, then `rabtap close <connection-name>' to close the
connection. See READNE.md for details.
  • Loading branch information
jandelgado committed May 6, 2018
1 parent 915c005 commit 4dbc7d5
Show file tree
Hide file tree
Showing 14 changed files with 298 additions and 17 deletions.
13 changes: 9 additions & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
# Changelog
# Changelog for rabtap

## [unreleased]
## v1.8 (2018-05-06)

### Added
* `--consumers` option of the `info` command now prints also information on

* a changelog ;)
* new `--consumers` option of the `info` command prints also information on
the connection.
* new command `conn` for connection related operations. Currently allows
to close a connection with `rabtap conn close <connection-name>`.

### Changed
* minor changes to output of `info` command (i.e. some values now are quoted)

* minor changes to output of `info` command (i.e. some values are now quoted)



36 changes: 36 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ and exchanges, inspect broker.
* [Publish messages](#publish-messages)
* [Messages consumer (subscribe)](#messages-consumer-subscribe)
* [Poor mans shovel](#poor-mans-shovel)
* [Close connection](#close-connection)
* [JSON message format](#json-message-format)
* [Build from source](#build-from-source)
* [Test data generator](#test-data-generator)
Expand Down Expand Up @@ -57,6 +58,10 @@ Output of `rabtap info` command:

![info mode](doc/images/info.png)

Output of `rabtap info --stats` command, showing additional statistics:

![info mode](doc/images/info-stats.png)

Output of rabtap running in `tap` mode, showing message meta data
with unset attributes filtered out and the message body:

Expand Down Expand Up @@ -86,6 +91,7 @@ Usage:
rabtap queue create QUEUE [--uri URI] [-adkv]
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue rm QUEUE [--uri URI] [-kv]
rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv]
rabtap --version
Examples:
Expand All @@ -100,13 +106,19 @@ Examples:
rabtap queue bind JDQ to amq.direct --bindingkey=key
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
raptap info
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
Options:
EXCHANGES comma-separated list of exchanges and binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
FILE file to publish in pub mode. If omitted, stdin will
be read.
QUEUE name of a queue.
CONNECTION name of a connection.
-a, --autodelete create auto delete exchange/queue.
--api APIURI connect to given API server. If APIURI is omitted,
the environment variable RABTAP_APIURI will be used.
Expand All @@ -119,6 +131,8 @@ Options:
metadata and body (as-is) are saved separately.
-k, --insecure allow insecure TLS connections (no certificate check).
-n, --no-color don't colorize output (also environment variable NO_COLOR)
--reason=REASON reason why the connection was closed
[default: closed by rabtap].
-r, --routingkey KEY routing key to use in publish mode.
--saveto DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
Expand Down Expand Up @@ -149,6 +163,7 @@ Rabtap understand the following commands:

* `queue` - create/bind/remove queue
* `exchange` - create/remove exhange
* `connection` - close connections

See the examples section for further information.

Expand Down Expand Up @@ -310,6 +325,27 @@ $ rabtap tap --uri amqp://broker1 my-topic-exchange:# --json | \
rabtap pub --uri amqp://broker2 amq.direct -r routingKey --json
```

#### Close connection

The `conn` command allows to close a connection. The name of the connection to
be closed is expected as parameter. Use the `info` command with the
`--consumers` option to find the connection associated with a queue. Example:

```
$ rabtap info
http://localhost:15672/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@ae1ad1477419')
└── Vhost /
├── amq.direct (exchange, type 'direct', [D])
:
└── test-topic (exchange, type 'topic', [AD])
├── test-q-test-topic-0 (queue, key='test-q-test-topic-0', running, [])
│ └── __rabtap-consumer-4823a3c0 (consumer user='guest', chan='172.17.0.1:59228 -> 172.17.0.2:5672 (1)')
│ └── '172.17.0.1:59228 -> 172.17.0.2:5672' (connection client='https://github.com/streadway/amqp', host='172.17.0.2:5672', peer='172.17.0.1:59228')
├── test-q-test-topic-1 (queue, key='test-q-test-topic-1', running, [])
:
$ rabtap conn close '172.17.0.1:59228 -> 172.17.0.2:5672'
```

## JSON message format

When using the `--json` option, messages are print/read as a stream of JSON
Expand Down
16 changes: 16 additions & 0 deletions cmd/main/cmd_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package main

import (
"crypto/tls"
"fmt"
"os"

rabtap "github.com/jandelgado/rabtap/pkg"
)

func cmdConnClose(apiURI, connName, reason string, tlsConfig *tls.Config) error {
client := rabtap.NewRabbitHTTPClient(apiURI, tlsConfig)
err := client.CloseConnection(connName, reason)
failOnError(err, fmt.Sprintf("close connection '%s'", connName), os.Exit)
return err
}
80 changes: 80 additions & 0 deletions cmd/main/cmd_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright (C) 2017 Jan Delgado
// +build integration

package main

import (
"crypto/tls"
"testing"
"time"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/jandelgado/rabtap/pkg/testcommon"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func findClosedConnName(connectionsBefore []rabtap.RabbitConnection,
connectionsAfter []rabtap.RabbitConnection) string {
// given to lists of connections, find the first connection by name which
// is in the first, but not in the second list.
for _, ca := range connectionsAfter {
found := false
for _, cb := range connectionsBefore {
if ca.Name == cb.Name {
found = true
break
}
}
if !found {
return ca.Name
}
}
return ""
}

func TestCmdCloseConnection(t *testing.T) {

uri := testcommon.IntegrationAPIURIFromEnv()
client := rabtap.NewRabbitHTTPClient(uri, &tls.Config{})

// we can not get the name of a connection through the API of the AMQP client. So
// we figure out the connections name by comparing the list of active
// connection before and after we created our test connection. Therefore,
// make sure this test runs isolated on the broker.
connsBefore, err := client.Connections()
require.Nil(t, err)

// start the test connection to be terminated
conn, _ := testcommon.IntegrationTestConnection(t, "", "", 0, false)

// it takes a few seconds for the new connection to show up in the REST API
time.Sleep(time.Second * 5)

connsAfter, err := client.Connections()
require.Nil(t, err)

// we add a notification callback and expect the cb to be called
// when we close the connection via the API
errorChan := make(chan *amqp.Error)
conn.NotifyClose(errorChan)

connToClose := findClosedConnName(connsBefore, connsAfter)
require.NotEqual(t, "", connToClose)

// now close the newly created connection. TODO handle potential
// call to failOnError in cmdConnClose
err = cmdConnClose(uri, connToClose, "some reason", &tls.Config{})
require.Nil(t, err)

// ... and make sure it gets closed, notified by a message on the errorChan
connClosed := false
select {
case <-errorChan:
connClosed = true
case <-time.After(time.Second * 2):
assert.Fail(t, "did not receive notification within expected time")
}
assert.True(t, connClosed)
}
3 changes: 2 additions & 1 deletion cmd/main/cmd_subscribe_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

package main

// cmd_{sub, queueCreate, queueBind, queueDelete} integration test
// cmd_{exchangeCreate, sub, queueCreate, queueBind, queueDelete}
// integration test

import (
"crypto/tls"
Expand Down
55 changes: 47 additions & 8 deletions cmd/main/command_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Usage:
rabtap queue create QUEUE [--uri URI] [-adkv]
rabtap queue bind QUEUE to EXCHANGE --bindingkey=KEY [--uri URI] [-kv]
rabtap queue rm QUEUE [--uri URI] [-kv]
rabtap conn close CONNECTION [--reason=REASON] [--api APIURI] [-kv]
rabtap --version
Examples:
Expand All @@ -43,13 +44,19 @@ Examples:
rabtap queue bind JDQ to amq.direct --bindingkey=key
rabtap queue rm JDQ
# use RABTAP_APIURI environment variable to specify mgmt api uri instead of --api
export RABTAP_APIURI=http://guest:guest@localhost:15672/api
raptap info
rabtap conn close "172.17.0.1:40874 -> 172.17.0.2:5672"
Options:
EXCHANGES comma-separated list of exchanges and binding keys,
e.g. amq.topic:# or exchange1:key1,exchange2:key2.
EXCHANGE name of an exchange, e.g. amq.direct.
FILE file to publish in pub mode. If omitted, stdin will
be read.
QUEUE name of a queue.
CONNECTION name of a connection.
-a, --autodelete create auto delete exchange/queue.
--api APIURI connect to given API server. If APIURI is omitted,
the environment variable RABTAP_APIURI will be used.
Expand All @@ -62,6 +69,8 @@ Options:
metadata and body (as-is) are saved separately.
-k, --insecure allow insecure TLS connections (no certificate check).
-n, --no-color don't colorize output (also environment variable NO_COLOR)
--reason=REASON reason why the connection was closed
[default: closed by rabtap].
-r, --routingkey KEY routing key to use in publish mode.
--saveto DIR also save messages and metadata to DIR.
--show-default include default exchange in output info command.
Expand Down Expand Up @@ -96,6 +105,8 @@ const (
QueueRemoveCmd
// QueueBindCmd binds a queue to an exchange
QueueBindCmd
// ConnCloseCmd closes a connection
ConnCloseCmd
)

type commonArgs struct {
Expand Down Expand Up @@ -128,6 +139,9 @@ type CommandLineArgs struct {
Autodelete bool // queue create, exchange create
SaveDir *string // save mode: optional directory to stores files to
ShowDefaultExchange bool

ConnName string // conn mode: name of connection
CloseReason string // conn mode: reason of close
}

// getAmqpURI returns the ith entry of amqpURIs array or the value
Expand All @@ -153,6 +167,19 @@ func parseAmqpURI(args map[string]interface{}) (string, error) {
return uri, nil
}

func parseAPIURI(args map[string]interface{}) (string, error) {
var apiURI string
if args["--api"] != nil {
apiURI = args["--api"].(string)
} else {
apiURI = os.Getenv("RABTAP_APIURI")
}
if apiURI == "" {
return "", fmt.Errorf("--api omitted but RABTAP_APIURI not set in environment")
}
return apiURI, nil
}

func parseCommonArgs(args map[string]interface{}) commonArgs {
return commonArgs{
Verbose: args["--verbose"].(bool),
Expand All @@ -169,16 +196,26 @@ func parseInfoCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
ShowStats: args["--stats"].(bool),
ShowDefaultExchange: args["--show-default"].(bool)}

var apiURI string
if args["--api"] != nil {
apiURI = args["--api"].(string)
} else {
apiURI = os.Getenv("RABTAP_APIURI")
var err error
if result.APIURI, err = parseAPIURI(args); err != nil {
return result, err
}
if apiURI == "" {
return CommandLineArgs{}, fmt.Errorf("--api omitted but RABTAP_APIURI not set in environment")
return result, nil
}

func parseConnCmdArgs(args map[string]interface{}) (CommandLineArgs, error) {
result := CommandLineArgs{
commonArgs: parseCommonArgs(args)}

var err error
if result.APIURI, err = parseAPIURI(args); err != nil {
return result, err
}
if args["close"].(bool) {
result.Cmd = ConnCloseCmd
result.ConnName = args["CONNECTION"].(string)
result.CloseReason = args["--reason"].(string)
}
result.APIURI = apiURI
return result, nil
}

Expand Down Expand Up @@ -307,6 +344,8 @@ func ParseCommandLineArgs(cliArgs []string) (CommandLineArgs, error) {
return parseQueueCmdArgs(args)
} else if args["exchange"].(bool) {
return parseExchangeCmdArgs(args)
} else if args["conn"].(bool) {
return parseConnCmdArgs(args)
}
return CommandLineArgs{}, fmt.Errorf("command missing")
}
23 changes: 23 additions & 0 deletions cmd/main/command_line_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,29 @@ func TestCliRemoveExchange(t *testing.T) {
assert.Equal(t, "uri", args.AmqpURI)
}

func TestCliCloseConnectionWithDefaultReason(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"conn", "close", "conn-name", "--api", "uri"})

assert.Nil(t, err)
assert.Equal(t, ConnCloseCmd, args.Cmd)
assert.Equal(t, "uri", args.APIURI)
assert.Equal(t, "conn-name", args.ConnName)
assert.Equal(t, "closed by rabtap", args.CloseReason)
}

func TestCliCloseConnection(t *testing.T) {
args, err := ParseCommandLineArgs(
[]string{"conn", "close", "conn-name", "--api", "uri",
"--reason", "reason"})

assert.Nil(t, err)
assert.Equal(t, ConnCloseCmd, args.Cmd)
assert.Equal(t, "uri", args.APIURI)
assert.Equal(t, "conn-name", args.ConnName)
assert.Equal(t, "reason", args.CloseReason)
}

func TestParseNoColorFromEnvironment(t *testing.T) {
const key = "NO_COLOR"
os.Setenv(key, "1")
Expand Down
3 changes: 3 additions & 0 deletions cmd/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,8 @@ func main() {
case QueueBindCmd:
cmdQueueBindToExchange(args.AmqpURI, args.QueueName,
args.QueueBindingKey, args.ExchangeName, tlsConfig)
case ConnCloseCmd:
cmdConnClose(args.APIURI, args.ConnName,
args.CloseReason, tlsConfig)
}
}
Binary file added doc/images/info-stats.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions pkg/amqp_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func (s *AmqpConnector) Connected() bool {

// Try to connect to the RabbitMQ server as long as it takes to establish a
// connection. Will be interrupted by any message on the control channel.
// TODO fail on first errornous connection attempt, only re-connect later.
func (s *AmqpConnector) redial() (*amqp.Connection, error) {
s.connection = nil
s.connected.Store(stateConnecting)
Expand Down
Loading

0 comments on commit 4dbc7d5

Please sign in to comment.