From 33403f6ce14447bab5dfb7029bdb488e27a5b19c Mon Sep 17 00:00:00 2001 From: jandelgado Date: Wed, 7 Oct 2020 23:05:27 +0200 Subject: [PATCH] Fix amqp headers (#45) make sure amqp.Publishing.Header always uses amqp.Table, also in sub-structures, especially when unmarshalled from JSON. Without special treatment, generic map[string]interface{} types are used, which are incompatible with go-amqp's type system. See https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/types.go#L227 --- CHANGELOG.md | 6 ++++ cmd/rabtap/main.go | 3 -- cmd/rabtap/message_reader_file.go | 1 - cmd/rabtap/message_reader_file_test.go | 1 - cmd/rabtap/message_writer.go | 40 ++++++++++++++++++++++++-- cmd/rabtap/message_writer_test.go | 20 +++++++++++++ cmd/rabtap/subscribe.go | 2 +- cmd/testgen/testgen.go | 2 +- pkg/publish.go | 2 +- pkg/rabbitmq_rest_client.go | 32 +++++++++------------ pkg/testcommon/test_common.go | 3 -- 11 files changed, 81 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd394f3..ad9f301 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,12 @@ # Changelog for rabtap +## v1.24 (2020-09-28) + +* new: support TLS client certificates (contributed by Francois Gouteroux) +* fix: make sure that headers in amqp.Publishing are always using amqp.Table + structures, which could caused problems before. + ## v1.23 (2020-04-09) * fix: avoid endless recursion in info command (#42) diff --git a/cmd/rabtap/main.go b/cmd/rabtap/main.go index 3978fc4..7ac150f 100644 --- a/cmd/rabtap/main.go +++ b/cmd/rabtap/main.go @@ -14,9 +14,6 @@ import ( "sort" "time" - //"net/http" - //_ "net/http/pprof" - rabtap "github.com/jandelgado/rabtap/pkg" "github.com/sirupsen/logrus" ) diff --git a/cmd/rabtap/message_reader_file.go b/cmd/rabtap/message_reader_file.go index 6a1448c..ea33c2a 100644 --- a/cmd/rabtap/message_reader_file.go +++ b/cmd/rabtap/message_reader_file.go @@ -21,7 +21,6 @@ type MessageReaderFunc func() (RabtapPersistentMessage, bool, error) // -> readRawMessage func readMessageFromRawFile(reader io.Reader) ([]byte, error) { return ioutil.ReadAll(reader) - //return amqp.Publishing{Body: buf}, false, err } // -> readJSONMessage diff --git a/cmd/rabtap/message_reader_file_test.go b/cmd/rabtap/message_reader_file_test.go index bc0cd57..1e8320c 100644 --- a/cmd/rabtap/message_reader_file_test.go +++ b/cmd/rabtap/message_reader_file_test.go @@ -77,7 +77,6 @@ func TestReadMessageFromJSONStreamReturnsOneMessagePerCall(t *testing.T) { { "Body": "c2Vjb25kCg==" }` - //reader := ioutil.NopCloser(bytes.NewReader([]byte("hello world"))) // r type is io.ReadCloser reader := bytes.NewReader([]byte(data)) decoder := json.NewDecoder(reader) diff --git a/cmd/rabtap/message_writer.go b/cmd/rabtap/message_writer.go index 19342cb..4eba1c2 100644 --- a/cmd/rabtap/message_writer.go +++ b/cmd/rabtap/message_writer.go @@ -46,11 +46,46 @@ type RabtapPersistentMessage struct { Body []byte } +// ensureTable returns an object where all map[string]interface{} +// are replaced by amqp.Table{} so it is compatible with the amqp +// libs type system when it comes to passing headers, which expects (nested) +// amqp.Table structures. +// +// See https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/types.go#L227 +func ensureTable(m interface{}) interface{} { + switch x := m.(type) { + + case []interface{}: + a := make([]interface{}, len(x)) + for i := range x { + a[i] = ensureTable(x[i]) + } + return a + + case amqp.Table: + m := amqp.Table{} + for k, v := range x { + m[k] = ensureTable(v) + } + return m + + case map[string]interface{}: + m := amqp.Table{} + for k, v := range x { + m[k] = ensureTable(v) + } + return m + + default: + return x + } +} + // CreateTimestampFilename returns a filename based on a RFC3339Nano // timstamp where all ":" are replaced with "_" func CreateTimestampFilename(t time.Time) string { basename := t.Format(time.RFC3339Nano) - return strings.Replace(basename, ":", "_", -1) + return strings.ReplaceAll(basename, ":", "_") } // NewRabtapPersistentMessage creates RabtapPersistentMessage object @@ -80,8 +115,9 @@ func NewRabtapPersistentMessage(message rabtap.TapMessage) RabtapPersistentMessa // ToAmqpPublishing converts message to an amqp.Publishing object func (s RabtapPersistentMessage) ToAmqpPublishing() amqp.Publishing { + headers := ensureTable(s.Headers) return amqp.Publishing{ - Headers: s.Headers, + Headers: headers.(amqp.Table), ContentType: s.ContentType, ContentEncoding: s.ContentEncoding, Priority: s.Priority, diff --git a/cmd/rabtap/message_writer_test.go b/cmd/rabtap/message_writer_test.go index 08ef45a..39cb2b0 100644 --- a/cmd/rabtap/message_writer_test.go +++ b/cmd/rabtap/message_writer_test.go @@ -34,6 +34,26 @@ var testMessage = &amqp.Delivery{ Body: []byte("simple test message."), } +func TestEnsureTableKeepsTable(t *testing.T) { + table := amqp.Table{"test": "a"} + assert.Equal(t, table, ensureTable(table)) +} + +func TestEnsureTableKeepsArray(t *testing.T) { + array := []interface{}{"a"} + assert.Equal(t, array, ensureTable(array)) +} + +func TestEnsureTableTransformsMapToTable(t *testing.T) { + m := map[string]interface{}{"k": "v"} + assert.Equal(t, amqp.Table{"k": "v"}, ensureTable(m)) +} + +func TestEnsureTableKeepsBasicType(t *testing.T) { + s := "test" + assert.Equal(t, s, ensureTable(s)) +} + func TestJSONMarshalIndentMarshalsToIndentedJSON(t *testing.T) { data, err := JSONMarshalIndent(map[string]string{"Test": "ABC"}) assert.Nil(t, err) diff --git a/cmd/rabtap/subscribe.go b/cmd/rabtap/subscribe.go index 24896f5..76e8f81 100644 --- a/cmd/rabtap/subscribe.go +++ b/cmd/rabtap/subscribe.go @@ -42,7 +42,7 @@ func messageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel, log.Debug("subscribe: messageReceiveLoop: channel closed.") return nil } - log.Debugf("subscribe: messageReceiveLoop: new message %#+v", message) + log.Debugf("subscribe: messageReceiveLoop: new message %+v", message) tmpCh := make(rabtap.TapChannel) go func() { m := <-tmpCh diff --git a/cmd/testgen/testgen.go b/cmd/testgen/testgen.go index 7f77fae..e6331d4 100644 --- a/cmd/testgen/testgen.go +++ b/cmd/testgen/testgen.go @@ -159,7 +159,7 @@ func createTopology(ch *amqp.Channel, exchanges []string, numTestQueues int) { log.Printf("creating queue %s", queueName) createQueue(ch, queueName) bindingKey, headers := getBindingKeyForExchange(exchange, i) - log.Printf("binding queue %s to exchange %s with bindingkey `%s` and headers %#+v", + log.Printf("binding queue %s to exchange %s with bindingkey `%s` and headers %+v", queueName, exchange, bindingKey, headers) bindQueue(ch, queueName, exchange, bindingKey, headers) } diff --git a/pkg/publish.go b/pkg/publish.go index d039241..751c27a 100644 --- a/pkg/publish.go +++ b/pkg/publish.go @@ -58,7 +58,7 @@ func (s *AmqpPublish) createWorkerFunc(publishChannel PublishChannel) AmqpWorker *message.Publishing) if err != nil { - s.logger.Print("publishing error %#+v", err) + s.logger.Printf("publishing error %w", err) // error publishing message - reconnect. return doReconnect, err } diff --git a/pkg/rabbitmq_rest_client.go b/pkg/rabbitmq_rest_client.go index 5ea442c..ce63bc4 100644 --- a/pkg/rabbitmq_rest_client.go +++ b/pkg/rabbitmq_rest_client.go @@ -481,28 +481,24 @@ type RabbitQueue struct { // RabbitBinding models the /bindings resource of the rabbitmq http api type RabbitBinding struct { - Source string `json:"source"` - Vhost string `json:"vhost"` - Destination string `json:"destination"` - DestinationType string `json:"destination_type"` - RoutingKey string `json:"routing_key"` - //Arguments struct { - Arguments map[string]interface{} `json:"arguments,omitempty"` - //} `json:"arguments"` - PropertiesKey string `json:"properties_key"` + Source string `json:"source"` + Vhost string `json:"vhost"` + Destination string `json:"destination"` + DestinationType string `json:"destination_type"` + RoutingKey string `json:"routing_key"` + Arguments map[string]interface{} `json:"arguments,omitempty"` + PropertiesKey string `json:"properties_key"` } // RabbitExchange models the /exchanges resource of the rabbitmq http api type RabbitExchange struct { - Name string `json:"name"` - Vhost string `json:"vhost"` - Type string `json:"type"` - Durable bool `json:"durable"` - AutoDelete bool `json:"auto_delete"` - Internal bool `json:"internal"` - Arguments map[string]interface{} `json:"arguments,omitempty"` - //Arguments struct { - //} `json:"arguments"` + Name string `json:"name"` + Vhost string `json:"vhost"` + Type string `json:"type"` + Durable bool `json:"durable"` + AutoDelete bool `json:"auto_delete"` + Internal bool `json:"internal"` + Arguments map[string]interface{} `json:"arguments,omitempty"` MessageStats struct { PublishOut int `json:"publish_out"` PublishOutDetails struct { diff --git a/pkg/testcommon/test_common.go b/pkg/testcommon/test_common.go index 6086895..af344e6 100644 --- a/pkg/testcommon/test_common.go +++ b/pkg/testcommon/test_common.go @@ -144,7 +144,6 @@ func PublishTestMessages(t *testing.T, ch *amqp.Channel, numMessages int, // inject messages into exchange. Each message should become visible // in the tap-exchange defined above. for i := 1; i <= numMessages; i++ { - //log.Printf("publishing message to exchange '%s' with routingkey '%s'", exchangeName, routingKey) // publish the test message err := ch.Publish( exchangeName, @@ -185,13 +184,11 @@ func VerifyTestMessageOnQueue(t *testing.T, ch *amqp.Channel, consumer string, n if string(d.Body) == "Hello" { numReceived++ } - //log.Printf("%s: %d received original message...", consumer, numReceived) // Await NumExpectedMessages if numReceived == numExpected { success <- numReceived } } - //log.Printf("%s: Exiting receiver", consumer) }() }