Skip to content

Commit

Permalink
http api requests use Context to be cancellable (#52)
Browse files Browse the repository at this point in the history
  • Loading branch information
jandelgado authored Mar 28, 2021
1 parent 7c66bc1 commit fab387c
Show file tree
Hide file tree
Showing 16 changed files with 165 additions and 134 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@

# Changelog for rabtap

## v1.27 (2021-03-28)

* new: `info` and `close` commands are can now be cancelled by SIGTERM

## v1.26 (2021-03-26)

* fix: make client certificate auth work. This implements a workaround until
Expand Down
5 changes: 3 additions & 2 deletions cmd/rabtap/cmd_conn.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"crypto/tls"
"fmt"
"net/url"
Expand All @@ -9,13 +10,13 @@ import (
rabtap "github.com/jandelgado/rabtap/pkg"
)

func cmdConnClose(apiURL, connName, reason string, tlsConfig *tls.Config) error {
func cmdConnClose(ctx context.Context, apiURL, connName, reason string, tlsConfig *tls.Config) error {
url, err := url.Parse(apiURL)
if err != nil {
return err
}
client := rabtap.NewRabbitHTTPClient(url, tlsConfig)
err = client.CloseConnection(connName, reason)
err = client.CloseConnection(ctx, connName, reason)
failOnError(err, fmt.Sprintf("close connection '%s'", connName), os.Exit)
return err
}
8 changes: 5 additions & 3 deletions cmd/rabtap/cmd_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package main

import (
"context"
"crypto/tls"
"net/url"
"testing"
Expand Down Expand Up @@ -44,7 +45,7 @@ func TestCmdCloseConnection(t *testing.T) {
// we figure out the connections name by comparing the list of active
// connection before and after we created our test connection. Therefore,
// make sure this test runs isolated on the broker.
connsBefore, err := client.Connections()
connsBefore, err := client.Connections(context.TODO())
require.Nil(t, err)

// start the test connection to be terminated
Expand All @@ -53,7 +54,7 @@ func TestCmdCloseConnection(t *testing.T) {
// it takes a few seconds for the new connection to show up in the REST API
time.Sleep(time.Second * 5)

connsAfter, err := client.Connections()
connsAfter, err := client.Connections(context.TODO())
require.Nil(t, err)

// we add a notification callback and expect the cb to be called
Expand All @@ -66,7 +67,8 @@ func TestCmdCloseConnection(t *testing.T) {

// now close the newly created connection. TODO handle potential
// call to failOnError in cmdConnClose
err = cmdConnClose(url.String(), connToClose, "some reason", &tls.Config{})
err = cmdConnClose(context.TODO(),
url.String(), connToClose, "some reason", &tls.Config{})
require.Nil(t, err)

// ... and make sure it gets closed, notified by a message on the errorChan
Expand Down
3 changes: 2 additions & 1 deletion cmd/rabtap/cmd_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"crypto/tls"
"net/url"
"os"
Expand All @@ -21,7 +22,7 @@ import (
func exchangeExists(t *testing.T, apiURL *url.URL, exchange string) bool {
// TODO add a simple client to testcommon
client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{})
exchanges, err := client.Exchanges()
exchanges, err := client.Exchanges(context.TODO())
require.Nil(t, err)
return rabtap.FindExchangeByName(exchanges, "/", exchange) != -1
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/rabtap/cmd_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"io"
"os"

Expand All @@ -21,8 +22,8 @@ type CmdInfoArg struct {
// cmdInfo queries the rabbitMQ brokers REST api and dispays infos
// on exchanges, queues, bindings etc in a human readably fashion.
// TODO proper error handling
func cmdInfo(cmd CmdInfoArg) {
brokerInfo, err := cmd.client.BrokerInfo()
func cmdInfo(ctx context.Context, cmd CmdInfoArg) {
brokerInfo, err := cmd.client.BrokerInfo(ctx)
failOnError(err, "failed retrieving info from rabbitmq REST api", os.Exit)

treeBuilder := NewBrokerInfoTreeBuilder(cmd.treeConfig)
Expand Down
107 changes: 57 additions & 50 deletions cmd/rabtap/cmd_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"crypto/tls"
"net/url"
"os"
Expand Down Expand Up @@ -33,20 +34,21 @@ func Example_cmdInfoByExchangeInTextFormat() {
url, _ := url.Parse(mock.URL)
client := rabtap.NewRabbitHTTPClient(url, &tls.Config{})

cmdInfo(CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byExchange",
ShowConsumers: true,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{
Format: "text",
ShowStats: false,
NoColor: true},
out: os.Stdout})
cmdInfo(context.TODO(),
CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byExchange",
ShowConsumers: true,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{
Format: "text",
ShowStats: false,
NoColor: true},
out: os.Stdout})

// Output:
// http://rabbitmq/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@08f57d1fe8ab')
Expand Down Expand Up @@ -84,20 +86,21 @@ func Example_cmdInfoByConnectionInTextFormat() {
url, _ := url.Parse(mock.URL)
client := rabtap.NewRabbitHTTPClient(url, &tls.Config{})

cmdInfo(CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byConnection",
ShowConsumers: true,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{
Format: "text",
ShowStats: false,
NoColor: true},
out: os.Stdout})
cmdInfo(context.TODO(),
CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byConnection",
ShowConsumers: true,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{
Format: "text",
ShowStats: false,
NoColor: true},
out: os.Stdout})

// Output:
// http://rabbitmq/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@08f57d1fe8ab')
Expand Down Expand Up @@ -203,17 +206,19 @@ func TestCmdInfoByExchangeInDotFormat(t *testing.T) {
client := rabtap.NewRabbitHTTPClient(url, &tls.Config{})

testfunc := func() {
cmdInfo(CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byExchange",
ShowConsumers: false,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{Format: "dot"},
out: os.Stdout})
cmdInfo(
context.TODO(),
CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byExchange",
ShowConsumers: false,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{Format: "dot"},
out: os.Stdout})
}
result := testcommon.CaptureOutput(testfunc)
assert.Equal(t, strings.Trim(expectedResultDotByExchange, " \n"),
Expand Down Expand Up @@ -246,17 +251,19 @@ func TestCmdInfoByConnectionInDotFormat(t *testing.T) {
client := rabtap.NewRabbitHTTPClient(url, &tls.Config{})

testfunc := func() {
cmdInfo(CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byConnection",
ShowConsumers: false,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{Format: "dot"},
out: os.Stdout})
cmdInfo(
context.TODO(),
CmdInfoArg{
rootNode: "http://rabbitmq/api",
client: client,
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: "byConnection",
ShowConsumers: false,
ShowDefaultExchange: false,
QueueFilter: TruePredicate,
OmitEmptyExchanges: false},
renderConfig: BrokerInfoRendererConfig{Format: "dot"},
out: os.Stdout})
}
result := testcommon.CaptureOutput(testfunc)
assert.Equal(t, strings.Trim(expectedResultDotByConnection, " \n"),
Expand Down
3 changes: 2 additions & 1 deletion cmd/rabtap/cmd_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"crypto/tls"
"net/url"
"os"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestIntegrationCmdQueueCreatePurgeiBindUnbindQueue(t *testing.T) {

// TODO add a simple client to testcommon
client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{})
queues, err := client.Queues()
queues, err := client.Queues(context.TODO())
assert.Nil(t, err)
i := rabtap.FindQueueByName(queues, "/", testQueue)
require.True(t, i != -1)
Expand Down
37 changes: 20 additions & 17 deletions cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,25 +72,28 @@ func getTLSConfig(insecureTLS bool, certFile string, keyFile string, caFile stri
return tlsConfig
}

func startCmdInfo(args CommandLineArgs, title string) {
func startCmdInfo(ctx context.Context, args CommandLineArgs, title string) {
queueFilter, err := NewPredicateExpression(args.QueueFilter)
failOnError(err, fmt.Sprintf("invalid queue filter predicate '%s'", args.QueueFilter), os.Exit)

apiURL, err := url.Parse(args.APIURI)
failOnError(err, "invalid api url", os.Exit)
cmdInfo(CmdInfoArg{
rootNode: title,
client: rabtap.NewRabbitHTTPClient(apiURL, getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile)),
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: args.InfoMode,
ShowConsumers: args.ShowConsumers,
ShowDefaultExchange: args.ShowDefaultExchange,
QueueFilter: queueFilter,
OmitEmptyExchanges: args.OmitEmptyExchanges},
renderConfig: BrokerInfoRendererConfig{
Format: args.Format,
ShowStats: args.ShowStats,
NoColor: args.NoColor},
out: NewColorableWriter(os.Stdout)})

cmdInfo(ctx,
CmdInfoArg{
rootNode: title,
client: rabtap.NewRabbitHTTPClient(apiURL, getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile)),
treeConfig: BrokerInfoTreeBuilderConfig{
Mode: args.InfoMode,
ShowConsumers: args.ShowConsumers,
ShowDefaultExchange: args.ShowDefaultExchange,
QueueFilter: queueFilter,
OmitEmptyExchanges: args.OmitEmptyExchanges},
renderConfig: BrokerInfoRendererConfig{
Format: args.Format,
ShowStats: args.ShowStats,
NoColor: args.NoColor},
out: NewColorableWriter(os.Stdout)})
}

// createMessageReaderForPublish returns a MessageReaderFunc that reads
Expand Down Expand Up @@ -183,7 +186,7 @@ func startCmdTap(ctx context.Context, args CommandLineArgs) {
func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Config) {
switch args.Cmd {
case InfoCmd:
startCmdInfo(args, args.APIURI)
startCmdInfo(ctx, args, args.APIURI)
case SubCmd:
startCmdSubscribe(ctx, args)
case PubCmd:
Expand Down Expand Up @@ -212,7 +215,7 @@ func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Confi
cmdQueueUnbindFromExchange(args.AmqpURI, args.QueueName,
args.QueueBindingKey, args.ExchangeName, tlsConfig)
case ConnCloseCmd:
cmdConnClose(args.APIURI, args.ConnName,
cmdConnClose(ctx, args.APIURI, args.ConnName,
args.CloseReason, tlsConfig)
}
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/rabtap/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package main

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -59,7 +60,7 @@ func Example_startCmdInfo() {
defer mock.Close()

args, _ := ParseCommandLineArgs([]string{"info", "--api", mock.URL, "--no-color"})
startCmdInfo(args, "http://rootnode")
startCmdInfo(context.TODO(), args, "http://rootnode")

// Output:
// http://rootnode (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@08f57d1fe8ab')
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/streadway/amqp v0.0.0-20190225234609-30f8ed68076e
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect
golang.org/x/net v0.0.0-20190620200207-3b0461eec859
golang.org/x/sync v0.0.0-20190423024810-112230192c58
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)
7 changes: 4 additions & 3 deletions pkg/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
package rabtap

import (
"context"
"errors"

"github.com/streadway/amqp"
Expand All @@ -11,10 +12,10 @@ import (
// DiscoverBindingsForExchange returns a string list of routing-keys that
// are used by the given exchange and broker. This list can be used to
// auto-tap to all queues on a given exchange
func DiscoverBindingsForExchange(rabbitAPIClient *RabbitHTTPClient, vhost, exchangeName string) ([]string, error) {
func DiscoverBindingsForExchange(ctx context.Context, rabbitAPIClient *RabbitHTTPClient, vhost, exchangeName string) ([]string, error) {

var bindingKeys []string
exchanges, err := rabbitAPIClient.Exchanges()
exchanges, err := rabbitAPIClient.Exchanges(ctx)

if err != nil {
return nil, err
Expand All @@ -37,7 +38,7 @@ func DiscoverBindingsForExchange(rabbitAPIClient *RabbitHTTPClient, vhost, excha
switch *exchangeType {
case amqp.ExchangeDirect:
// filter out all bindings for given exchange
bindings, err := rabbitAPIClient.Bindings()
bindings, err := rabbitAPIClient.Bindings(ctx)

if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit fab387c

Please sign in to comment.