Skip to content

Commit

Permalink
Add message received timestamp to message JSON (#12)
Browse files Browse the repository at this point in the history
* record timestamp when a message was received (preparation for #10)
* new JSON field XRabtapReceivedTimestamp recording timestamp when a message was received added
* change uuid library to githib.com/google/uuid
* fix linting erros
* simplified code
  • Loading branch information
jandelgado authored Jun 13, 2019
1 parent 4c71c1c commit 76360a7
Show file tree
Hide file tree
Showing 19 changed files with 173 additions and 215 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@

# Changelog for rabtap

## v1.17 (2019-06-13)

* Timestamp when message was received by rabtap now stored in JSON format
in `XRabtapReceivedTimestamp` field.
* Simplified code

## v1.16 (2019-04-03)

* new option `--by-connection` for info command added, making `info` show
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,7 @@ messages in the following format:
"Redelivered": false,
"Exchange": "amq.topic",
"RoutingKey": "test-q-amq.topic-0",
"XRabtapReceivedTimestamp": "2019-06-13T19:33:51.920711583+02:00",
"Body": "dGhpcyB0ZXN0IG1lc3NhZ2U .... IGFuZCBoZWFkZXJzIGFtcXAuVGFibGV7fQ=="
}
...
Expand Down
7 changes: 4 additions & 3 deletions cmd/rabtap/cmd_subscribe_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/jandelgado/rabtap/pkg/testcommon"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
Expand All @@ -27,7 +28,7 @@ func TestCmdSubFailsEarlyWhenBrokerIsNotAvailable(t *testing.T) {
amqpURI: "invalid uri",
queue: "queue",
tlsConfig: &tls.Config{},
messageReceiveFunc: func(*amqp.Delivery) error { return nil },
messageReceiveFunc: func(rabtap.TapMessage) error { return nil },
signalChannel: make(chan os.Signal, 1)})
done <- true
}()
Expand All @@ -49,9 +50,9 @@ func TestCmdSub(t *testing.T) {
amqpURI := testcommon.IntegrationURIFromEnv()

done := make(chan bool)
receiveFunc := func(message *amqp.Delivery) error {
receiveFunc := func(message rabtap.TapMessage) error {
log.Debug("test: received message: #+v", message)
if string(message.Body) == testMessage {
if string(message.AmqpMessage.Body) == testMessage {
done <- true
}
return nil
Expand Down
2 changes: 0 additions & 2 deletions cmd/rabtap/cmd_tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

package main

// subscribe cli command handler

import (
"crypto/tls"
"os"
Expand Down
4 changes: 2 additions & 2 deletions cmd/rabtap/cmd_tap_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ func TestCmdTap(t *testing.T) {

// receiveFunc must receive messages passed through tapMessageChannel
done := make(chan bool)
receiveFunc := func(message *amqp.Delivery) error {
receiveFunc := func(message rabtap.TapMessage) error {
log.Debug("received message on tap: #+v", message)
if string(message.Body) == "Hello" {
if string(message.AmqpMessage.Body) == "Hello" {
done <- true
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions cmd/rabtap/default_message_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@

package main

import "github.com/streadway/amqp"
import rabtap "github.com/jandelgado/rabtap/pkg"

// DefaultMessageFormatter is the standard message.
type DefaultMessageFormatter struct{}

// Format just returns the message body as string, no formatting applied.
func (s DefaultMessageFormatter) Format(message *amqp.Delivery) string {
return string(message.Body)
func (s DefaultMessageFormatter) Format(message rabtap.TapMessage) string {
return string(message.AmqpMessage.Body)
}
22 changes: 11 additions & 11 deletions cmd/rabtap/json_message_formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"encoding/json"
"strings"

"github.com/streadway/amqp"
rabtap "github.com/jandelgado/rabtap/pkg"
)

// JSONMessageFormatter pretty prints JSON formatted messages.
Expand All @@ -22,21 +22,21 @@ var (
// Format validates and formats a message in JSON format. The body can be a
// simple JSON object or an array of JSON objecs. If the message is not valid
// JSON, it will returned unformatted as-is.
func (s JSONMessageFormatter) Format(d *amqp.Delivery) string {
func (s JSONMessageFormatter) Format(message rabtap.TapMessage) string {

var message []byte
originalMessage := strings.TrimSpace(string(d.Body))
var formatted []byte
originalMessage := strings.TrimSpace(string(message.AmqpMessage.Body))
if originalMessage[0] == '[' {
// try to unmarshal array to JSON objects
var arrayJSONObj []map[string]interface{}
err := json.Unmarshal([]byte(originalMessage), &arrayJSONObj)
if err != nil {
return string(d.Body)
return string(message.AmqpMessage.Body)
}
// pretty print JSON
message, err = json.MarshalIndent(arrayJSONObj, "", " ")
formatted, err = json.MarshalIndent(arrayJSONObj, "", " ")
if err != nil {
return string(d.Body)
return string(message.AmqpMessage.Body)
}
} else {

Expand All @@ -45,13 +45,13 @@ func (s JSONMessageFormatter) Format(d *amqp.Delivery) string {
err := json.Unmarshal([]byte(originalMessage), &simpleJSONObj)

if err != nil {
return string(d.Body)
return string(message.AmqpMessage.Body)
}

message, err = json.MarshalIndent(simpleJSONObj, "", " ")
formatted, err = json.MarshalIndent(simpleJSONObj, "", " ")
if err != nil {
return string(d.Body)
return string(message.AmqpMessage.Body)
}
}
return string(message)
return string(formatted)
}
10 changes: 6 additions & 4 deletions cmd/rabtap/json_message_formatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ package main

import (
"testing"
"time"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
)
Expand All @@ -14,7 +16,7 @@ func TestJSONFormatterInvalidArray(t *testing.T) {
message := amqp.Delivery{
Body: []byte("[ {\"a\":1} "),
}
formattedMessage := JSONMessageFormatter{}.Format(&message)
formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, nil, time.Now()))
// message is expected to be returned untouched
assert.Equal(t, "[ {\"a\":1} ", formattedMessage)
}
Expand All @@ -24,7 +26,7 @@ func TestJSONFormatterValidArray(t *testing.T) {
message := amqp.Delivery{
Body: []byte(" [ {\"a\":1} ] "),
}
formattedMessage := JSONMessageFormatter{}.Format(&message)
formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, nil, time.Now()))
assert.Equal(t, "[\n {\n \"a\": 1\n }\n]", formattedMessage)
}

Expand All @@ -33,7 +35,7 @@ func TestJSONFormatterInvalidObject(t *testing.T) {
message := amqp.Delivery{
Body: []byte("[ {\"a\":1 "),
}
formattedMessage := JSONMessageFormatter{}.Format(&message)
formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, nil, time.Now()))
// message is expected to be returned untouched
assert.Equal(t, "[ {\"a\":1 ", formattedMessage)
}
Expand All @@ -43,6 +45,6 @@ func TestJSONFormatterValidObject(t *testing.T) {
message := amqp.Delivery{
Body: []byte(" {\"a\":1} "),
}
formattedMessage := JSONMessageFormatter{}.Format(&message)
formattedMessage := JSONMessageFormatter{}.Format(rabtap.NewTapMessage(&message, nil, time.Now()))
assert.Equal(t, "{\n \"a\": 1\n}", formattedMessage)
}
71 changes: 28 additions & 43 deletions cmd/rabtap/message_printer.go
Original file line number Diff line number Diff line change
@@ -1,49 +1,44 @@
// Copyright (C) 2017 Jan Delgado
// Copyright (C) 2017-2019 Jan Delgado

package main

import (
"encoding/json"
"io"
"text/template"

"github.com/streadway/amqp"
rabtap "github.com/jandelgado/rabtap/pkg"
)

// messageTemplate is the default template to print a message
const messageTemplate = `------ {{ .Title }} ------
exchange.......: {{ ExchangeColor .Message.Exchange }}
{{with .Message.RoutingKey}}routingkey.....: {{ KeyColor .}}
{{end}}{{with .Message.Priority}}priority.......: {{.}}
{{end}}{{with .Message.Expiration}}expiration.....: {{.}}
{{end}}{{with .Message.ContentType}}content-type...: {{.}}
{{end}}{{with .Message.ContentEncoding}}content-enc....: {{.}}
{{end}}{{with .Message.MessageId}}app-message-id.: {{.}}
{{end}}{{if not .Message.Timestamp.IsZero}}app-timestamp..: {{ .Message.Timestamp }}
{{end}}{{with .Message.Type}}app-type.......: {{.}}
{{end}}{{with .Message.CorrelationId}}app-corr-id....: {{.}}
{{end}}{{with .Message.Headers}}app-headers....: {{.}}
// TODO allow externalization of template
const messageTemplate = `------ message received on {{ .Message.ReceivedTimestamp.Format "2006-01-02T15:04:05Z07:00" }} ------
exchange.......: {{ ExchangeColor .Message.AmqpMessage.Exchange }}
{{with .Message.AmqpMessage.RoutingKey}}routingkey.....: {{ KeyColor .}}
{{end}}{{with .Message.AmqpMessage.Priority}}priority.......: {{.}}
{{end}}{{with .Message.AmqpMessage.Expiration}}expiration.....: {{.}}
{{end}}{{with .Message.AmqpMessage.ContentType}}content-type...: {{.}}
{{end}}{{with .Message.AmqpMessage.ContentEncoding}}content-enc....: {{.}}
{{end}}{{with .Message.AmqpMessage.MessageId}}app-message-id.: {{.}}
{{end}}{{if not .Message.AmqpMessage.Timestamp.IsZero}}app-timestamp..: {{ .Message.AmqpMessage.Timestamp }}
{{end}}{{with .Message.AmqpMessage.Type}}app-type.......: {{.}}
{{end}}{{with .Message.AmqpMessage.CorrelationId}}app-corr-id....: {{.}}
{{end}}{{with .Message.AmqpMessage.Headers}}app-headers....: {{.}}
{{end -}}
{{ MessageColor .Body }}
`

// PrintMessageInfo holds info for template
type PrintMessageInfo struct {
// Title to print
Title string
// Message receveived
Message amqp.Delivery
Message rabtap.TapMessage
// formatted body
Body string
// formatted headers
Headers string
}

// MessageFormatter formats the body of ampq.Delivery objects according to its
// type
// MessageFormatter formats the body of tapped message
type MessageFormatter interface {
Format(message *amqp.Delivery) string
Format(message rabtap.TapMessage) string
}

// Registry of available message formatters. Key is contentType
Expand All @@ -55,36 +50,26 @@ func RegisterMessageFormatter(contentType string, formatter MessageFormatter) {
messageFormatters[contentType] = formatter
}

// NewMessageFormatter return a message formatter suitable for the given
// message type, determined by the message headers content type.
func NewMessageFormatter(message *amqp.Delivery) MessageFormatter {
if formatter, ok := messageFormatters[message.ContentType]; ok {
// NewMessageFormatter return a message formatter suitable the given
// contentType.
func NewMessageFormatter(contentType string) MessageFormatter {
if formatter, ok := messageFormatters[contentType]; ok {
return formatter
}
return DefaultMessageFormatter{}
}

// PrettyPrintMessage formats and prints a amqp.Delivery message
func PrettyPrintMessage(out io.Writer, message *amqp.Delivery,
title string, noColor bool) error {
// PrettyPrintMessage formats and prints a tapped message
func PrettyPrintMessage(out io.Writer, message rabtap.TapMessage,
noColor bool) error {

colorizer := NewColorPrinter(noColor)

// get mesagge formatter according to message type
formatter := NewMessageFormatter(message)
formatter := NewMessageFormatter(message.AmqpMessage.ContentType)

// nicely print headers as JSON for better readability
headers, err := json.Marshal(message.Headers)
if err != nil {
return err
}

body := formatter.Format(message)
printStruct := PrintMessageInfo{
Title: title,
Message: *message,
Body: body,
Headers: string(headers),
Message: message,
Body: formatter.Format(message),
}
t := template.Must(template.New("message").
Funcs(colorizer.GetFuncMap()).Parse(messageTemplate))
Expand Down
19 changes: 12 additions & 7 deletions cmd/rabtap/message_printer_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (C) 2017 Jan Delgado
// Copyright (C) 2017-2019 Jan Delgado

package main

Expand All @@ -7,16 +7,17 @@ import (
"testing"
"time"

rabtap "github.com/jandelgado/rabtap/pkg"
"github.com/streadway/amqp"
"github.com/stretchr/testify/assert"
)

func TestNewMessageFormatter(t *testing.T) {

assert.Equal(t, JSONMessageFormatter{},
NewMessageFormatter(&amqp.Delivery{ContentType: "application/json"}))
NewMessageFormatter("application/json"))
assert.Equal(t, DefaultMessageFormatter{},
NewMessageFormatter(&amqp.Delivery{ContentType: "unknown"}))
NewMessageFormatter("unknown"))
}

func ExamplePrettyPrintMessage() {
Expand All @@ -36,10 +37,12 @@ func ExamplePrettyPrintMessage() {
Body: []byte("simple test message"),
}

_ = PrettyPrintMessage(os.Stdout, &message, "title", true)
ts := time.Date(2019, time.June, 6, 23, 0, 0, 0, time.UTC)
noColor := true
_ = PrettyPrintMessage(os.Stdout, rabtap.NewTapMessage(&message, nil, ts), noColor)

// Output:
// ------ title ------
// ------ message received on 2019-06-06T23:00:00Z ------
// exchange.......: exchange
// routingkey.....: routingkey
// priority.......: 99
Expand All @@ -62,10 +65,12 @@ func ExamplePrettyPrintMessage_withFilteredAtributes() {
Body: []byte("simple test message"),
}

_ = PrettyPrintMessage(os.Stdout, &message, "title", true)
noColor := true
ts := time.Date(2019, time.June, 6, 23, 0, 0, 0, time.UTC)
_ = PrettyPrintMessage(os.Stdout, rabtap.NewTapMessage(&message, nil, ts), noColor)

// Output:
// ------ title ------
// ------ message received on 2019-06-06T23:00:00Z ------
// exchange.......: exchange
// simple test message
//
Expand Down
Loading

0 comments on commit 76360a7

Please sign in to comment.