Skip to content

Commit

Permalink
Fix amqp headers (#45)
Browse files Browse the repository at this point in the history
make sure amqp.Publishing.Header always uses amqp.Table,
also in sub-structures, especially when unmarshalled from JSON.
Without special treatment, generic map[string]interface{} types
are used, which are incompatible with go-amqp's type system.

See https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/types.go#L227
  • Loading branch information
jandelgado authored Oct 7, 2020
1 parent e24bafc commit 33403f6
Show file tree
Hide file tree
Showing 11 changed files with 81 additions and 31 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@

# Changelog for rabtap

## v1.24 (2020-09-28)

* new: support TLS client certificates (contributed by Francois Gouteroux)
* fix: make sure that headers in amqp.Publishing are always using amqp.Table
structures, which could caused problems before.

## v1.23 (2020-04-09)

* fix: avoid endless recursion in info command (#42)
Expand Down
3 changes: 0 additions & 3 deletions cmd/rabtap/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ import (
"sort"
"time"

//"net/http"
//_ "net/http/pprof"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/sirupsen/logrus"
)
Expand Down
1 change: 0 additions & 1 deletion cmd/rabtap/message_reader_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ type MessageReaderFunc func() (RabtapPersistentMessage, bool, error)
// -> readRawMessage
func readMessageFromRawFile(reader io.Reader) ([]byte, error) {
return ioutil.ReadAll(reader)
//return amqp.Publishing{Body: buf}, false, err
}

// -> readJSONMessage
Expand Down
1 change: 0 additions & 1 deletion cmd/rabtap/message_reader_file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func TestReadMessageFromJSONStreamReturnsOneMessagePerCall(t *testing.T) {
{
"Body": "c2Vjb25kCg=="
}`
//reader := ioutil.NopCloser(bytes.NewReader([]byte("hello world"))) // r type is io.ReadCloser
reader := bytes.NewReader([]byte(data))
decoder := json.NewDecoder(reader)

Expand Down
40 changes: 38 additions & 2 deletions cmd/rabtap/message_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,46 @@ type RabtapPersistentMessage struct {
Body []byte
}

// ensureTable returns an object where all map[string]interface{}
// are replaced by amqp.Table{} so it is compatible with the amqp
// libs type system when it comes to passing headers, which expects (nested)
// amqp.Table structures.
//
// See https://github.com/streadway/amqp/blob/e6b33f460591b0acb2f13b04ef9cf493720ffe17/types.go#L227
func ensureTable(m interface{}) interface{} {
switch x := m.(type) {

case []interface{}:
a := make([]interface{}, len(x))
for i := range x {
a[i] = ensureTable(x[i])
}
return a

case amqp.Table:
m := amqp.Table{}
for k, v := range x {
m[k] = ensureTable(v)
}
return m

case map[string]interface{}:
m := amqp.Table{}
for k, v := range x {
m[k] = ensureTable(v)
}
return m

default:
return x
}
}

// CreateTimestampFilename returns a filename based on a RFC3339Nano
// timstamp where all ":" are replaced with "_"
func CreateTimestampFilename(t time.Time) string {
basename := t.Format(time.RFC3339Nano)
return strings.Replace(basename, ":", "_", -1)
return strings.ReplaceAll(basename, ":", "_")
}

// NewRabtapPersistentMessage creates RabtapPersistentMessage object
Expand Down Expand Up @@ -80,8 +115,9 @@ func NewRabtapPersistentMessage(message rabtap.TapMessage) RabtapPersistentMessa

// ToAmqpPublishing converts message to an amqp.Publishing object
func (s RabtapPersistentMessage) ToAmqpPublishing() amqp.Publishing {
headers := ensureTable(s.Headers)
return amqp.Publishing{
Headers: s.Headers,
Headers: headers.(amqp.Table),
ContentType: s.ContentType,
ContentEncoding: s.ContentEncoding,
Priority: s.Priority,
Expand Down
20 changes: 20 additions & 0 deletions cmd/rabtap/message_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,26 @@ var testMessage = &amqp.Delivery{
Body: []byte("simple test message."),
}

func TestEnsureTableKeepsTable(t *testing.T) {
table := amqp.Table{"test": "a"}
assert.Equal(t, table, ensureTable(table))
}

func TestEnsureTableKeepsArray(t *testing.T) {
array := []interface{}{"a"}
assert.Equal(t, array, ensureTable(array))
}

func TestEnsureTableTransformsMapToTable(t *testing.T) {
m := map[string]interface{}{"k": "v"}
assert.Equal(t, amqp.Table{"k": "v"}, ensureTable(m))
}

func TestEnsureTableKeepsBasicType(t *testing.T) {
s := "test"
assert.Equal(t, s, ensureTable(s))
}

func TestJSONMarshalIndentMarshalsToIndentedJSON(t *testing.T) {
data, err := JSONMarshalIndent(map[string]string{"Test": "ABC"})
assert.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/rabtap/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func messageReceiveLoop(ctx context.Context, messageChan rabtap.TapChannel,
log.Debug("subscribe: messageReceiveLoop: channel closed.")
return nil
}
log.Debugf("subscribe: messageReceiveLoop: new message %#+v", message)
log.Debugf("subscribe: messageReceiveLoop: new message %+v", message)
tmpCh := make(rabtap.TapChannel)
go func() {
m := <-tmpCh
Expand Down
2 changes: 1 addition & 1 deletion cmd/testgen/testgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func createTopology(ch *amqp.Channel, exchanges []string, numTestQueues int) {
log.Printf("creating queue %s", queueName)
createQueue(ch, queueName)
bindingKey, headers := getBindingKeyForExchange(exchange, i)
log.Printf("binding queue %s to exchange %s with bindingkey `%s` and headers %#+v",
log.Printf("binding queue %s to exchange %s with bindingkey `%s` and headers %+v",
queueName, exchange, bindingKey, headers)
bindQueue(ch, queueName, exchange, bindingKey, headers)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *AmqpPublish) createWorkerFunc(publishChannel PublishChannel) AmqpWorker
*message.Publishing)

if err != nil {
s.logger.Print("publishing error %#+v", err)
s.logger.Printf("publishing error %w", err)
// error publishing message - reconnect.
return doReconnect, err
}
Expand Down
32 changes: 14 additions & 18 deletions pkg/rabbitmq_rest_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,28 +481,24 @@ type RabbitQueue struct {

// RabbitBinding models the /bindings resource of the rabbitmq http api
type RabbitBinding struct {
Source string `json:"source"`
Vhost string `json:"vhost"`
Destination string `json:"destination"`
DestinationType string `json:"destination_type"`
RoutingKey string `json:"routing_key"`
//Arguments struct {
Arguments map[string]interface{} `json:"arguments,omitempty"`
//} `json:"arguments"`
PropertiesKey string `json:"properties_key"`
Source string `json:"source"`
Vhost string `json:"vhost"`
Destination string `json:"destination"`
DestinationType string `json:"destination_type"`
RoutingKey string `json:"routing_key"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
PropertiesKey string `json:"properties_key"`
}

// RabbitExchange models the /exchanges resource of the rabbitmq http api
type RabbitExchange struct {
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
//Arguments struct {
//} `json:"arguments"`
Name string `json:"name"`
Vhost string `json:"vhost"`
Type string `json:"type"`
Durable bool `json:"durable"`
AutoDelete bool `json:"auto_delete"`
Internal bool `json:"internal"`
Arguments map[string]interface{} `json:"arguments,omitempty"`
MessageStats struct {
PublishOut int `json:"publish_out"`
PublishOutDetails struct {
Expand Down
3 changes: 0 additions & 3 deletions pkg/testcommon/test_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,6 @@ func PublishTestMessages(t *testing.T, ch *amqp.Channel, numMessages int,
// inject messages into exchange. Each message should become visible
// in the tap-exchange defined above.
for i := 1; i <= numMessages; i++ {
//log.Printf("publishing message to exchange '%s' with routingkey '%s'", exchangeName, routingKey)
// publish the test message
err := ch.Publish(
exchangeName,
Expand Down Expand Up @@ -185,13 +184,11 @@ func VerifyTestMessageOnQueue(t *testing.T, ch *amqp.Channel, consumer string, n
if string(d.Body) == "Hello" {
numReceived++
}
//log.Printf("%s: %d received original message...", consumer, numReceived)

// Await NumExpectedMessages
if numReceived == numExpected {
success <- numReceived
}
}
//log.Printf("%s: Exiting receiver", consumer)
}()
}

0 comments on commit 33403f6

Please sign in to comment.