-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtailmq.go
156 lines (131 loc) · 4.18 KB
/
tailmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package main
import (
"flag"
"fmt"
"log"
"os"
"io/ioutil"
"net/url"
"strings"
"time"
"github.com/olaurendeau/tailmq/consumer"
"github.com/streadway/amqp"
"github.com/logrusorgru/aurora"
)
const Help = `DESCRIPTION
TailMQ tail AMQP exchanges and output messages in stdout
USAGE
tailmq [options] <exchange_name> (<routing_key>)
EXAMPLES
tailmq amp.direct - Tail exchange amp.direct on local server with default access
tailmq amp.topic flower.# - Tail exchange amp.topic based on routing_key flower.#, will catch messages flower.tulip, flower.rose but not tool.chain
tailmq -uri=amqp://user:[email protected]:5672//awesome amp.topic - Tail exchange amp.topic from server tailmq.com in vhost /awesome
tailmq -server=prod amp.fanout - Tail exchange amp.fanout from server prod configured in file ~/.tailmq
tailmq -server=prod -vhost=/foobar amp.fanout - Tail exchange amp.fanout from server prod configured in file ~/.tailmq but use vhost /foobar
OPTIONS
`
type Config struct {
uri *string
server *string
vhost *string
verbose *bool
prefix *bool
header *bool
help *bool
exchangeName string
routingKey string
globalConfigFilePath *string
noColors *bool
global GlobalConfig
}
func main() {
var err error
config := new(Config)
config.uri = flag.String("uri", "amqp://guest:guest@localhost:5672/", "RabbitMQ amqp uri")
config.server = flag.String("server", "", "Use predefined server from configuration")
config.vhost = flag.String("vhost", "", "Define vhost to tail from")
config.prefix = flag.Bool("prefix", false, "Should output be prefixed with datetime and time")
config.header = flag.Bool("header", false, "Should output display headers")
config.verbose = flag.Bool("verbose", false, "Do you want more informations ?")
config.help = flag.Bool("help", false, "How does it work ?")
config.globalConfigFilePath = flag.String("config", "", "Path of the global config file to use")
config.noColors = flag.Bool("no-colors", false, "If you don't want a colorful life :'(")
flag.Parse()
config.global, err = NewGlobalConfig(*config.globalConfigFilePath)
failOnError(err, "Fail retrieving server list")
configureLogger(config)
displayHelp(config)
setUri(config)
if flag.NArg() == 0 {
log.Fatalf("Please choose an exchange to listen from")
} else if flag.NArg() == 1 {
config.exchangeName = flag.Arg(0)
} else if flag.NArg() == 2 {
config.exchangeName = flag.Arg(0)
config.routingKey = flag.Arg(1)
} else {
log.Fatalf("Not yet available")
}
c := consumer.New(*config.uri, config.exchangeName, config.routingKey)
go c.Start()
defer c.Stop()
for {
select {
case d := <-c.Deliveries:
printDelivery(d, config)
case err := <-c.Err:
failOnError(err, "Fail consuming")
}
}
}
func printDelivery(d amqp.Delivery, config *Config) {
au := aurora.NewAurora(!*config.noColors)
if *config.prefix {
fmt.Printf("%s", au.Green(fmt.Sprintf("[%s]", time.Now().Format("2006-01-02 15:04:05"))))
if d.RoutingKey != "" {
fmt.Printf(" %s ", au.Green(d.RoutingKey))
}
fmt.Printf("\n")
}
if *config.header {
for k,v := range(d.Headers) {
fmt.Printf("%s", au.Cyan(fmt.Sprintf("Header : %s=%s\n", k, v)))
}
}
fmt.Printf("%s\n", d.Body)
}
func configureLogger(config *Config) {
if (!*config.verbose) {
log.SetOutput(ioutil.Discard)
}
}
func setUri(config *Config) {
if (*config.server != "") {
server, err := config.global.getServerUri(*config.server)
failOnError(err, "Failed to find server configuration")
*config.uri = server
}
if (*config.vhost != "") {
parsedUri, err := url.Parse(*config.uri)
failOnError(err, "Failed to parse uri")
// If vhost start with a single slash it would be removed by Uri String() so we double it
if (strings.Index(*config.vhost, "/") == 0) {
parsedUri.Path = "/" + *config.vhost
} else {
parsedUri.Path = *config.vhost
}
*config.uri = parsedUri.String()
}
}
func displayHelp(config *Config) {
if *config.help {
fmt.Print(Help)
flag.PrintDefaults()
os.Exit(0)
}
}
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}