diff --git a/CHANGELOG.md b/CHANGELOG.md index 7950aaa..d6901f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,10 @@ # Changelog for rabtap +## v1.27 (2021-03-28) + +* new: `info` and `close` commands are can now be cancelled by SIGTERM + ## v1.26 (2021-03-26) * fix: make client certificate auth work. This implements a workaround until diff --git a/cmd/rabtap/cmd_conn.go b/cmd/rabtap/cmd_conn.go index 4555020..998eb4f 100644 --- a/cmd/rabtap/cmd_conn.go +++ b/cmd/rabtap/cmd_conn.go @@ -1,6 +1,7 @@ package main import ( + "context" "crypto/tls" "fmt" "net/url" @@ -9,13 +10,13 @@ import ( rabtap "github.com/jandelgado/rabtap/pkg" ) -func cmdConnClose(apiURL, connName, reason string, tlsConfig *tls.Config) error { +func cmdConnClose(ctx context.Context, apiURL, connName, reason string, tlsConfig *tls.Config) error { url, err := url.Parse(apiURL) if err != nil { return err } client := rabtap.NewRabbitHTTPClient(url, tlsConfig) - err = client.CloseConnection(connName, reason) + err = client.CloseConnection(ctx, connName, reason) failOnError(err, fmt.Sprintf("close connection '%s'", connName), os.Exit) return err } diff --git a/cmd/rabtap/cmd_conn_test.go b/cmd/rabtap/cmd_conn_test.go index a2ae946..d8fcd35 100644 --- a/cmd/rabtap/cmd_conn_test.go +++ b/cmd/rabtap/cmd_conn_test.go @@ -4,6 +4,7 @@ package main import ( + "context" "crypto/tls" "net/url" "testing" @@ -44,7 +45,7 @@ func TestCmdCloseConnection(t *testing.T) { // we figure out the connections name by comparing the list of active // connection before and after we created our test connection. Therefore, // make sure this test runs isolated on the broker. - connsBefore, err := client.Connections() + connsBefore, err := client.Connections(context.TODO()) require.Nil(t, err) // start the test connection to be terminated @@ -53,7 +54,7 @@ func TestCmdCloseConnection(t *testing.T) { // it takes a few seconds for the new connection to show up in the REST API time.Sleep(time.Second * 5) - connsAfter, err := client.Connections() + connsAfter, err := client.Connections(context.TODO()) require.Nil(t, err) // we add a notification callback and expect the cb to be called @@ -66,7 +67,8 @@ func TestCmdCloseConnection(t *testing.T) { // now close the newly created connection. TODO handle potential // call to failOnError in cmdConnClose - err = cmdConnClose(url.String(), connToClose, "some reason", &tls.Config{}) + err = cmdConnClose(context.TODO(), + url.String(), connToClose, "some reason", &tls.Config{}) require.Nil(t, err) // ... and make sure it gets closed, notified by a message on the errorChan diff --git a/cmd/rabtap/cmd_exchange_test.go b/cmd/rabtap/cmd_exchange_test.go index 007c049..cce8f03 100644 --- a/cmd/rabtap/cmd_exchange_test.go +++ b/cmd/rabtap/cmd_exchange_test.go @@ -5,6 +5,7 @@ package main import ( + "context" "crypto/tls" "net/url" "os" @@ -21,7 +22,7 @@ import ( func exchangeExists(t *testing.T, apiURL *url.URL, exchange string) bool { // TODO add a simple client to testcommon client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{}) - exchanges, err := client.Exchanges() + exchanges, err := client.Exchanges(context.TODO()) require.Nil(t, err) return rabtap.FindExchangeByName(exchanges, "/", exchange) != -1 } diff --git a/cmd/rabtap/cmd_info.go b/cmd/rabtap/cmd_info.go index a62933a..d6bfb3c 100644 --- a/cmd/rabtap/cmd_info.go +++ b/cmd/rabtap/cmd_info.go @@ -3,6 +3,7 @@ package main import ( + "context" "io" "os" @@ -21,8 +22,8 @@ type CmdInfoArg struct { // cmdInfo queries the rabbitMQ brokers REST api and dispays infos // on exchanges, queues, bindings etc in a human readably fashion. // TODO proper error handling -func cmdInfo(cmd CmdInfoArg) { - brokerInfo, err := cmd.client.BrokerInfo() +func cmdInfo(ctx context.Context, cmd CmdInfoArg) { + brokerInfo, err := cmd.client.BrokerInfo(ctx) failOnError(err, "failed retrieving info from rabbitmq REST api", os.Exit) treeBuilder := NewBrokerInfoTreeBuilder(cmd.treeConfig) diff --git a/cmd/rabtap/cmd_info_test.go b/cmd/rabtap/cmd_info_test.go index f926d68..0586d1b 100644 --- a/cmd/rabtap/cmd_info_test.go +++ b/cmd/rabtap/cmd_info_test.go @@ -5,6 +5,7 @@ package main import ( + "context" "crypto/tls" "net/url" "os" @@ -33,20 +34,21 @@ func Example_cmdInfoByExchangeInTextFormat() { url, _ := url.Parse(mock.URL) client := rabtap.NewRabbitHTTPClient(url, &tls.Config{}) - cmdInfo(CmdInfoArg{ - rootNode: "http://rabbitmq/api", - client: client, - treeConfig: BrokerInfoTreeBuilderConfig{ - Mode: "byExchange", - ShowConsumers: true, - ShowDefaultExchange: false, - QueueFilter: TruePredicate, - OmitEmptyExchanges: false}, - renderConfig: BrokerInfoRendererConfig{ - Format: "text", - ShowStats: false, - NoColor: true}, - out: os.Stdout}) + cmdInfo(context.TODO(), + CmdInfoArg{ + rootNode: "http://rabbitmq/api", + client: client, + treeConfig: BrokerInfoTreeBuilderConfig{ + Mode: "byExchange", + ShowConsumers: true, + ShowDefaultExchange: false, + QueueFilter: TruePredicate, + OmitEmptyExchanges: false}, + renderConfig: BrokerInfoRendererConfig{ + Format: "text", + ShowStats: false, + NoColor: true}, + out: os.Stdout}) // Output: // http://rabbitmq/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@08f57d1fe8ab') @@ -84,20 +86,21 @@ func Example_cmdInfoByConnectionInTextFormat() { url, _ := url.Parse(mock.URL) client := rabtap.NewRabbitHTTPClient(url, &tls.Config{}) - cmdInfo(CmdInfoArg{ - rootNode: "http://rabbitmq/api", - client: client, - treeConfig: BrokerInfoTreeBuilderConfig{ - Mode: "byConnection", - ShowConsumers: true, - ShowDefaultExchange: false, - QueueFilter: TruePredicate, - OmitEmptyExchanges: false}, - renderConfig: BrokerInfoRendererConfig{ - Format: "text", - ShowStats: false, - NoColor: true}, - out: os.Stdout}) + cmdInfo(context.TODO(), + CmdInfoArg{ + rootNode: "http://rabbitmq/api", + client: client, + treeConfig: BrokerInfoTreeBuilderConfig{ + Mode: "byConnection", + ShowConsumers: true, + ShowDefaultExchange: false, + QueueFilter: TruePredicate, + OmitEmptyExchanges: false}, + renderConfig: BrokerInfoRendererConfig{ + Format: "text", + ShowStats: false, + NoColor: true}, + out: os.Stdout}) // Output: // http://rabbitmq/api (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@08f57d1fe8ab') @@ -203,17 +206,19 @@ func TestCmdInfoByExchangeInDotFormat(t *testing.T) { client := rabtap.NewRabbitHTTPClient(url, &tls.Config{}) testfunc := func() { - cmdInfo(CmdInfoArg{ - rootNode: "http://rabbitmq/api", - client: client, - treeConfig: BrokerInfoTreeBuilderConfig{ - Mode: "byExchange", - ShowConsumers: false, - ShowDefaultExchange: false, - QueueFilter: TruePredicate, - OmitEmptyExchanges: false}, - renderConfig: BrokerInfoRendererConfig{Format: "dot"}, - out: os.Stdout}) + cmdInfo( + context.TODO(), + CmdInfoArg{ + rootNode: "http://rabbitmq/api", + client: client, + treeConfig: BrokerInfoTreeBuilderConfig{ + Mode: "byExchange", + ShowConsumers: false, + ShowDefaultExchange: false, + QueueFilter: TruePredicate, + OmitEmptyExchanges: false}, + renderConfig: BrokerInfoRendererConfig{Format: "dot"}, + out: os.Stdout}) } result := testcommon.CaptureOutput(testfunc) assert.Equal(t, strings.Trim(expectedResultDotByExchange, " \n"), @@ -246,17 +251,19 @@ func TestCmdInfoByConnectionInDotFormat(t *testing.T) { client := rabtap.NewRabbitHTTPClient(url, &tls.Config{}) testfunc := func() { - cmdInfo(CmdInfoArg{ - rootNode: "http://rabbitmq/api", - client: client, - treeConfig: BrokerInfoTreeBuilderConfig{ - Mode: "byConnection", - ShowConsumers: false, - ShowDefaultExchange: false, - QueueFilter: TruePredicate, - OmitEmptyExchanges: false}, - renderConfig: BrokerInfoRendererConfig{Format: "dot"}, - out: os.Stdout}) + cmdInfo( + context.TODO(), + CmdInfoArg{ + rootNode: "http://rabbitmq/api", + client: client, + treeConfig: BrokerInfoTreeBuilderConfig{ + Mode: "byConnection", + ShowConsumers: false, + ShowDefaultExchange: false, + QueueFilter: TruePredicate, + OmitEmptyExchanges: false}, + renderConfig: BrokerInfoRendererConfig{Format: "dot"}, + out: os.Stdout}) } result := testcommon.CaptureOutput(testfunc) assert.Equal(t, strings.Trim(expectedResultDotByConnection, " \n"), diff --git a/cmd/rabtap/cmd_queue_test.go b/cmd/rabtap/cmd_queue_test.go index bad18f2..7e6f46e 100644 --- a/cmd/rabtap/cmd_queue_test.go +++ b/cmd/rabtap/cmd_queue_test.go @@ -5,6 +5,7 @@ package main import ( + "context" "crypto/tls" "net/url" "os" @@ -50,7 +51,7 @@ func TestIntegrationCmdQueueCreatePurgeiBindUnbindQueue(t *testing.T) { // TODO add a simple client to testcommon client := rabtap.NewRabbitHTTPClient(apiURL, &tls.Config{}) - queues, err := client.Queues() + queues, err := client.Queues(context.TODO()) assert.Nil(t, err) i := rabtap.FindQueueByName(queues, "/", testQueue) require.True(t, i != -1) diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index a3cbe6b..c709521 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -72,25 +72,28 @@ func getTLSConfig(insecureTLS bool, certFile string, keyFile string, caFile stri return tlsConfig } -func startCmdInfo(args CommandLineArgs, title string) { +func startCmdInfo(ctx context.Context, args CommandLineArgs, title string) { queueFilter, err := NewPredicateExpression(args.QueueFilter) failOnError(err, fmt.Sprintf("invalid queue filter predicate '%s'", args.QueueFilter), os.Exit) + apiURL, err := url.Parse(args.APIURI) failOnError(err, "invalid api url", os.Exit) - cmdInfo(CmdInfoArg{ - rootNode: title, - client: rabtap.NewRabbitHTTPClient(apiURL, getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile)), - treeConfig: BrokerInfoTreeBuilderConfig{ - Mode: args.InfoMode, - ShowConsumers: args.ShowConsumers, - ShowDefaultExchange: args.ShowDefaultExchange, - QueueFilter: queueFilter, - OmitEmptyExchanges: args.OmitEmptyExchanges}, - renderConfig: BrokerInfoRendererConfig{ - Format: args.Format, - ShowStats: args.ShowStats, - NoColor: args.NoColor}, - out: NewColorableWriter(os.Stdout)}) + + cmdInfo(ctx, + CmdInfoArg{ + rootNode: title, + client: rabtap.NewRabbitHTTPClient(apiURL, getTLSConfig(args.InsecureTLS, args.TLSCertFile, args.TLSKeyFile, args.TLSCaFile)), + treeConfig: BrokerInfoTreeBuilderConfig{ + Mode: args.InfoMode, + ShowConsumers: args.ShowConsumers, + ShowDefaultExchange: args.ShowDefaultExchange, + QueueFilter: queueFilter, + OmitEmptyExchanges: args.OmitEmptyExchanges}, + renderConfig: BrokerInfoRendererConfig{ + Format: args.Format, + ShowStats: args.ShowStats, + NoColor: args.NoColor}, + out: NewColorableWriter(os.Stdout)}) } // createMessageReaderForPublish returns a MessageReaderFunc that reads @@ -183,7 +186,7 @@ func startCmdTap(ctx context.Context, args CommandLineArgs) { func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Config) { switch args.Cmd { case InfoCmd: - startCmdInfo(args, args.APIURI) + startCmdInfo(ctx, args, args.APIURI) case SubCmd: startCmdSubscribe(ctx, args) case PubCmd: @@ -212,7 +215,7 @@ func dispatchCmd(ctx context.Context, args CommandLineArgs, tlsConfig *tls.Confi cmdQueueUnbindFromExchange(args.AmqpURI, args.QueueName, args.QueueBindingKey, args.ExchangeName, tlsConfig) case ConnCloseCmd: - cmdConnClose(args.APIURI, args.ConnName, + cmdConnClose(ctx, args.APIURI, args.ConnName, args.CloseReason, tlsConfig) } } diff --git a/cmd/rabtap/main_test.go b/cmd/rabtap/main_test.go index 098affb..9756b49 100644 --- a/cmd/rabtap/main_test.go +++ b/cmd/rabtap/main_test.go @@ -3,6 +3,7 @@ package main import ( + "context" "errors" "testing" @@ -59,7 +60,7 @@ func Example_startCmdInfo() { defer mock.Close() args, _ := ParseCommandLineArgs([]string{"info", "--api", mock.URL, "--no-color"}) - startCmdInfo(args, "http://rootnode") + startCmdInfo(context.TODO(), args, "http://rootnode") // Output: // http://rootnode (broker ver='3.6.9', mgmt ver='3.6.9', cluster='rabbit@08f57d1fe8ab') diff --git a/go.mod b/go.mod index 565cb33..25ac4a4 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/streadway/amqp v0.0.0-20190225234609-30f8ed68076e github.com/stretchr/testify v1.3.0 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect - golang.org/x/net v0.0.0-20190620200207-3b0461eec859 // indirect + golang.org/x/net v0.0.0-20190620200207-3b0461eec859 golang.org/x/sync v0.0.0-20190423024810-112230192c58 gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 ) diff --git a/pkg/discovery.go b/pkg/discovery.go index 7557cf2..e1af96f 100644 --- a/pkg/discovery.go +++ b/pkg/discovery.go @@ -3,6 +3,7 @@ package rabtap import ( + "context" "errors" "github.com/streadway/amqp" @@ -11,10 +12,10 @@ import ( // DiscoverBindingsForExchange returns a string list of routing-keys that // are used by the given exchange and broker. This list can be used to // auto-tap to all queues on a given exchange -func DiscoverBindingsForExchange(rabbitAPIClient *RabbitHTTPClient, vhost, exchangeName string) ([]string, error) { +func DiscoverBindingsForExchange(ctx context.Context, rabbitAPIClient *RabbitHTTPClient, vhost, exchangeName string) ([]string, error) { var bindingKeys []string - exchanges, err := rabbitAPIClient.Exchanges() + exchanges, err := rabbitAPIClient.Exchanges(ctx) if err != nil { return nil, err @@ -37,7 +38,7 @@ func DiscoverBindingsForExchange(rabbitAPIClient *RabbitHTTPClient, vhost, excha switch *exchangeType { case amqp.ExchangeDirect: // filter out all bindings for given exchange - bindings, err := rabbitAPIClient.Bindings() + bindings, err := rabbitAPIClient.Bindings(ctx) if err != nil { return nil, err diff --git a/pkg/discovery_test.go b/pkg/discovery_test.go index b2c410a..ecf1d7b 100644 --- a/pkg/discovery_test.go +++ b/pkg/discovery_test.go @@ -3,6 +3,7 @@ package rabtap import ( + "context" "crypto/tls" "net/url" "testing" @@ -18,7 +19,7 @@ func TestDiscoveryUnknownExchange(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - _, err := DiscoverBindingsForExchange(client, "/", "unknown") + _, err := DiscoverBindingsForExchange(context.TODO(), client, "/", "unknown") assert.NotNil(t, err) } @@ -29,7 +30,7 @@ func TestDiscoveryDirectExchange(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := DiscoverBindingsForExchange(client, "/", "test-direct") + result, err := DiscoverBindingsForExchange(context.TODO(), client, "/", "test-direct") assert.Nil(t, err) assert.Equal(t, 2, len(result)) assert.Equal(t, "direct-q1", result[0]) @@ -43,7 +44,7 @@ func TestDiscoveryTopicExchange(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := DiscoverBindingsForExchange(client, "/", "test-topic") + result, err := DiscoverBindingsForExchange(context.TODO(), client, "/", "test-topic") assert.Nil(t, err) assert.Equal(t, 1, len(result)) assert.Equal(t, "#", result[0]) @@ -55,7 +56,7 @@ func TestDiscoveryFanoutExchange(t *testing.T) { defer mock.Close() url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := DiscoverBindingsForExchange(client, "/", "test-fanout") + result, err := DiscoverBindingsForExchange(context.TODO(), client, "/", "test-fanout") assert.Nil(t, err) assert.Equal(t, 1, len(result)) @@ -68,7 +69,7 @@ func TestDiscoveryHeadersExchange(t *testing.T) { defer mock.Close() url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := DiscoverBindingsForExchange(client, "/", "test-headers") + result, err := DiscoverBindingsForExchange(context.TODO(), client, "/", "test-headers") assert.Nil(t, err) assert.Equal(t, 1, len(result)) diff --git a/pkg/exchange_test.go b/pkg/exchange_test.go index 5c814a5..e42c2c6 100644 --- a/pkg/exchange_test.go +++ b/pkg/exchange_test.go @@ -10,6 +10,7 @@ package rabtap // $ sudo docker run --rm -ti -p5672:5672 rabbitmq:3-management) import ( + "context" "crypto/tls" "net/url" "testing" @@ -38,7 +39,7 @@ func TestIntegrationAmqpExchangeCreateRemove(t *testing.T) { client := NewRabbitHTTPClient(url, &tls.Config{}) // make sure exchange does not exist before creation - exchanges, err := client.Exchanges() + exchanges, err := client.Exchanges(context.TODO()) assert.Nil(t, err) assert.Equal(t, -1, findExchange(testName, exchanges)) @@ -50,7 +51,7 @@ func TestIntegrationAmqpExchangeCreateRemove(t *testing.T) { assert.Nil(t, err) // check if exchange was created - exchanges, err = client.Exchanges() + exchanges, err = client.Exchanges(context.TODO()) assert.Nil(t, err) assert.NotEqual(t, -1, findExchange(testName, exchanges)) @@ -59,7 +60,7 @@ func TestIntegrationAmqpExchangeCreateRemove(t *testing.T) { assert.Nil(t, err) // check if exchange was deleted - exchanges, err = client.Exchanges() + exchanges, err = client.Exchanges(context.TODO()) assert.Nil(t, err) assert.Equal(t, -1, findExchange(testName, exchanges)) } diff --git a/pkg/queue_test.go b/pkg/queue_test.go index 383b7da..df79f36 100644 --- a/pkg/queue_test.go +++ b/pkg/queue_test.go @@ -10,6 +10,7 @@ package rabtap // $ sudo docker run --rm -ti -p5672:5672 rabbitmq:3-management) import ( + "context" "crypto/tls" "net/url" "testing" @@ -76,7 +77,7 @@ func TestIntegrationAmqpQueueCreateBindUnbindAndRemove(t *testing.T) { client := NewRabbitHTTPClient(url, &tls.Config{}) // make sure queue does not exist before creation - queues, err := client.Queues() + queues, err := client.Queues(context.TODO()) assert.Nil(t, err) assert.Equal(t, -1, findQueue(queueTestName, queues)) @@ -88,28 +89,28 @@ func TestIntegrationAmqpQueueCreateBindUnbindAndRemove(t *testing.T) { assert.Nil(t, err) // check if queue was created - queues, err = client.Queues() + queues, err = client.Queues(context.TODO()) assert.Nil(t, err) assert.NotEqual(t, -1, findQueue(queueTestName, queues)) // bind queue to exchange err = BindQueueToExchange(session, queueTestName, keyTestName, exchangeTestName) assert.Nil(t, err) - bindings, err := client.Bindings() + bindings, err := client.Bindings(context.TODO()) assert.Nil(t, err) assert.NotEqual(t, -1, findBinding(queueTestName, exchangeTestName, keyTestName, bindings)) // unbind queue from exchange err = UnbindQueueFromExchange(session, queueTestName, keyTestName, exchangeTestName) assert.Nil(t, err) - bindings, err = client.Bindings() + bindings, err = client.Bindings(context.TODO()) assert.Nil(t, err) assert.Equal(t, -1, findBinding(queueTestName, exchangeTestName, keyTestName, bindings)) // finally remove queue err = RemoveQueue(session, queueTestName, false, false) assert.Nil(t, err) - queues, err = client.Queues() + queues, err = client.Queues(context.TODO()) assert.Nil(t, err) assert.Equal(t, -1, findQueue(queueTestName, queues)) } diff --git a/pkg/rabbitmq_rest_client.go b/pkg/rabbitmq_rest_client.go index fefe3b1..2ec733b 100644 --- a/pkg/rabbitmq_rest_client.go +++ b/pkg/rabbitmq_rest_client.go @@ -4,6 +4,7 @@ package rabtap import ( "bytes" + "context" "crypto/tls" "encoding/json" "errors" @@ -11,6 +12,8 @@ import ( "net/url" "reflect" + "golang.org/x/net/context/ctxhttp" + "golang.org/x/sync/errgroup" ) @@ -40,11 +43,11 @@ type httpRequest struct { // getResource gets resource constructed from s.url and equest.url and // deserialized the resource into an request.t type, which is returned. // TODO split function in http and unmarshaling part -func (s RabbitHTTPClient) getResource(request httpRequest) (interface{}, error) { +func (s RabbitHTTPClient) getResource(ctx context.Context, request httpRequest) (interface{}, error) { r := reflect.New(request.t).Interface() url := s.url.String() + "/" + request.path - resp, err := s.client.Get(url) + resp, err := ctxhttp.Get(ctx, s.client, url) if err != nil { return r, err } @@ -65,14 +68,14 @@ func (s RabbitHTTPClient) getResource(request httpRequest) (interface{}, error) } // delResource make DELETE request to given relative path -func (s RabbitHTTPClient) delResource(path string) error { +func (s RabbitHTTPClient) delResource(ctx context.Context, path string) error { url := s.url.String() + "/" + path req, err := http.NewRequest("DELETE", url, nil) if err != nil { return err } - resp, err := s.client.Do(req) + resp, err := ctxhttp.Do(ctx, s.client, req) if err != nil { return err } @@ -95,58 +98,59 @@ type BrokerInfo struct { } // Overview returns the /overview resource of the RabbitMQ REST API -func (s RabbitHTTPClient) Overview() (RabbitOverview, error) { - res, err := s.getResource(httpRequest{"overview", reflect.TypeOf(RabbitOverview{})}) +func (s RabbitHTTPClient) Overview(ctx context.Context) (RabbitOverview, error) { + res, err := s.getResource(ctx, httpRequest{"overview", reflect.TypeOf(RabbitOverview{})}) return *res.(*RabbitOverview), err } // Connections returns the /connections resource of the RabbitMQ REST API -func (s RabbitHTTPClient) Connections() ([]RabbitConnection, error) { - res, err := s.getResource(httpRequest{"connections", reflect.TypeOf([]RabbitConnection{})}) +func (s RabbitHTTPClient) Connections(ctx context.Context) ([]RabbitConnection, error) { + res, err := s.getResource(ctx, httpRequest{"connections", reflect.TypeOf([]RabbitConnection{})}) return *res.(*[]RabbitConnection), err } // Exchanges returns the /exchanges resource of the RabbitMQ REST API -func (s RabbitHTTPClient) Exchanges() ([]RabbitExchange, error) { - res, err := s.getResource(httpRequest{"exchanges", reflect.TypeOf([]RabbitExchange{})}) +func (s RabbitHTTPClient) Exchanges(ctx context.Context) ([]RabbitExchange, error) { + res, err := s.getResource(ctx, httpRequest{"exchanges", reflect.TypeOf([]RabbitExchange{})}) return *res.(*[]RabbitExchange), err } // Queues returns the /queues resource of the RabbitMQ REST API -func (s RabbitHTTPClient) Queues() ([]RabbitQueue, error) { - res, err := s.getResource(httpRequest{"queues", reflect.TypeOf([]RabbitQueue{})}) +func (s RabbitHTTPClient) Queues(ctx context.Context) ([]RabbitQueue, error) { + res, err := s.getResource(ctx, httpRequest{"queues", reflect.TypeOf([]RabbitQueue{})}) return *res.(*[]RabbitQueue), err } // Consumers returns the /consumers resource of the RabbitMQ REST API -func (s RabbitHTTPClient) Consumers() ([]RabbitConsumer, error) { - res, err := s.getResource(httpRequest{"consumers", reflect.TypeOf([]RabbitConsumer{})}) +func (s RabbitHTTPClient) Consumers(ctx context.Context) ([]RabbitConsumer, error) { + res, err := s.getResource(ctx, httpRequest{"consumers", reflect.TypeOf([]RabbitConsumer{})}) return *res.(*[]RabbitConsumer), err } // Bindings returns the /bindings resource of the RabbitMQ REST API -func (s RabbitHTTPClient) Bindings() ([]RabbitBinding, error) { - res, err := s.getResource(httpRequest{"bindings", reflect.TypeOf([]RabbitBinding{})}) +func (s RabbitHTTPClient) Bindings(ctx context.Context) ([]RabbitBinding, error) { + res, err := s.getResource(ctx, httpRequest{"bindings", reflect.TypeOf([]RabbitBinding{})}) return *res.(*[]RabbitBinding), err } // BrokerInfo gets all resources of the broker in parallel // TODO use a ctx to for timeout/cancellation -func (s RabbitHTTPClient) BrokerInfo() (BrokerInfo, error) { - var g errgroup.Group +func (s RabbitHTTPClient) BrokerInfo(ctx context.Context) (BrokerInfo, error) { + g, ctx := errgroup.WithContext(ctx) var r BrokerInfo - g.Go(func() (err error) { r.Overview, err = s.Overview(); return }) - g.Go(func() (err error) { r.Connections, err = s.Connections(); return }) - g.Go(func() (err error) { r.Exchanges, err = s.Exchanges(); return }) - g.Go(func() (err error) { r.Queues, err = s.Queues(); return }) - g.Go(func() (err error) { r.Consumers, err = s.Consumers(); return }) - g.Go(func() (err error) { r.Bindings, err = s.Bindings(); return }) + + g.Go(func() (err error) { r.Overview, err = s.Overview(ctx); return }) + g.Go(func() (err error) { r.Connections, err = s.Connections(ctx); return }) + g.Go(func() (err error) { r.Exchanges, err = s.Exchanges(ctx); return }) + g.Go(func() (err error) { r.Queues, err = s.Queues(ctx); return }) + g.Go(func() (err error) { r.Consumers, err = s.Consumers(ctx); return }) + g.Go(func() (err error) { r.Bindings, err = s.Bindings(ctx); return }) return r, g.Wait() } // CloseConnection closes a connection by DELETING the associated resource -func (s RabbitHTTPClient) CloseConnection(conn, reason string) error { - return s.delResource("connections/" + conn) +func (s RabbitHTTPClient) CloseConnection(ctx context.Context, conn, reason string) error { + return s.delResource(ctx, "connections/"+conn) } // FindQueueByName searches in the queues array for a queue with the given diff --git a/pkg/rabbitmq_rest_client_test.go b/pkg/rabbitmq_rest_client_test.go index d4f49ae..acfafe9 100644 --- a/pkg/rabbitmq_rest_client_test.go +++ b/pkg/rabbitmq_rest_client_test.go @@ -3,6 +3,7 @@ package rabtap import ( + "context" "crypto/tls" "encoding/json" "fmt" @@ -22,7 +23,7 @@ func TestGetAllResources(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - all, err := client.BrokerInfo() + all, err := client.BrokerInfo(context.TODO()) assert.Nil(t, err) assert.Equal(t, "3.6.9", all.Overview.ManagementVersion) @@ -36,7 +37,7 @@ func TestGetAllResources(t *testing.T) { func TestGetAllResourcesOnInvalidHostReturnErr(t *testing.T) { url, _ := url.Parse("localhost:1") client := NewRabbitHTTPClient(url, &tls.Config{}) - _, err := client.BrokerInfo() + _, err := client.BrokerInfo(context.TODO()) assert.NotNil(t, err) } @@ -46,7 +47,7 @@ func TestGetResourceInvalidUriReturnsError(t *testing.T) { defer mock.Close() url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - _, err := client.getResource(httpRequest{"invalid", reflect.TypeOf(RabbitOverview{})}) + _, err := client.getResource(context.TODO(), httpRequest{"invalid", reflect.TypeOf(RabbitOverview{})}) assert.NotNil(t, err) } @@ -62,7 +63,7 @@ func TestGetResourceStatusNot200(t *testing.T) { url, _ := url.Parse(ts.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - _, err := client.getResource(httpRequest{"overview", reflect.TypeOf(RabbitOverview{})}) + _, err := client.getResource(context.TODO(), httpRequest{"overview", reflect.TypeOf(RabbitOverview{})}) assert.NotNil(t, err) // TODO check error } @@ -78,7 +79,7 @@ func TestGetResourceInvalidJSON(t *testing.T) { url, _ := url.Parse(ts.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - _, err := client.getResource(httpRequest{"overview", reflect.TypeOf(RabbitOverview{})}) + _, err := client.getResource(context.TODO(), httpRequest{"overview", reflect.TypeOf(RabbitOverview{})}) assert.NotNil(t, err) // TODO check error } @@ -90,7 +91,7 @@ func TestRabbitClientGetExchanges(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := client.Exchanges() + result, err := client.Exchanges(context.TODO()) assert.Nil(t, err) assert.Equal(t, 12, len(result)) assert.Equal(t, "", (result)[0].Name) @@ -108,7 +109,7 @@ func TestRabbitClientGetQueues(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := client.Queues() + result, err := client.Queues(context.TODO()) assert.Nil(t, err) assert.Equal(t, 8, len(result)) assert.Equal(t, "/", (result)[0].Vhost) @@ -124,7 +125,7 @@ func TestRabbitClientGetOverview(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - result, err := client.Overview() + result, err := client.Overview(context.TODO()) assert.Nil(t, err) assert.Equal(t, "3.6.9", result.ManagementVersion) } @@ -137,7 +138,7 @@ func TestRabbitClientGetBindings(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - _, err := client.Bindings() + _, err := client.Bindings(context.TODO()) assert.Nil(t, err) // TODO @@ -151,7 +152,7 @@ func TestRabbitClientGetConsumers(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - consumer, err := client.Consumers() + consumer, err := client.Consumers(context.TODO()) assert.Nil(t, err) assert.Equal(t, 2, len(consumer)) assert.Equal(t, "some_consumer", consumer[0].ConsumerTag) @@ -167,7 +168,7 @@ func TestRabbitClientGetConnections(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - conn, err := client.Connections() + conn, err := client.Connections(context.TODO()) assert.Nil(t, err) assert.Equal(t, 1, len(conn)) assert.Equal(t, "172.17.0.1:40874 -> 172.17.0.2:5672", conn[0].Name) @@ -251,7 +252,7 @@ func TestRabbitClientGetConsumersChannelDetailsIsEmptyArray(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - consumer, err := client.Consumers() + consumer, err := client.Consumers(context.TODO()) assert.Nil(t, err) assert.Equal(t, 2, len(consumer)) @@ -269,7 +270,8 @@ func TestRabbitClientCloseExistingConnection(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - err := client.CloseConnection("172.17.0.1:40874 -> 172.17.0.2:5672", "reason") + err := client.CloseConnection(context.TODO(), + "172.17.0.1:40874 -> 172.17.0.2:5672", "reason") assert.Nil(t, err) } @@ -281,7 +283,7 @@ func TestRabbitClientCloseNonExistingConnectionRaisesError(t *testing.T) { url, _ := url.Parse(mock.URL) client := NewRabbitHTTPClient(url, &tls.Config{}) - err := client.CloseConnection("DOES NOT EXIST", "reason") + err := client.CloseConnection(context.TODO(), "DOES NOT EXIST", "reason") assert.NotNil(t, err) }