-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsend_to_stomp
executable file
·137 lines (108 loc) · 4.73 KB
/
send_to_stomp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
from __future__ import print_function
import sys, json, os, datetime, re, Queue, threading, time, optparse, base64
from select import select
from stompclient import StompClient
import sys
class ExitToken:
pass
exit_token = ExitToken()
q = Queue.Queue(1000)
hostname = os.popen("hostname").read().strip()
parser = optparse.OptionParser(usage="Usage: %prog [options] <stomp host:port>")
parser.add_option('-m', '--message-offset OFFSET', action="store", dest="offset", help="Offset of first byte of message (if this byte is a space or tab, the line will be added to the previous event)", default=0, type="int")
parser.add_option('-n', '--host NAME', action="store", dest="hostname", help="Override 'host' field (defaults to %s)" % hostname, default=hostname)
parser.add_option('-e', '--exchange NAME', action="store", dest="exchange", help="Destination exchange")
parser.add_option('-k', '--routing-key NAME', action="store", dest="routing_key", help="Rounting key (defaults to empty string)", default=None)
parser.add_option('-v', '--virtual-host NAME', action="store", dest="vhost", help="Virtual host (defaults to '/')", default="/")
parser.add_option('-u', '--credentials USERNAME:PASSWORD', action="store", dest="credentials", help="Username:password passed as basic auth to http request")
parser.add_option('-c', '--cut', action="store_true", dest="cut", help="Discard bytes from start of line until OFFSET", default=False)
parser.add_option('-b', '--heartbeat INTERVAL', action="store", dest="heartbeat", help="Interval in seconds to output a heartbeat to stderr", default=60, type="int")
options, args = parser.parse_args()
if not options.exchange:
parser.error('Exchange not given')
if len(args) == 0:
parser.error('Stomp host:port not given')
def eprint(*args, **kwargs):
print(*args, file=sys.stderr, **kwargs)
def log_json(name, props={}):
a = {
'timestamp': datetime.datetime.now().isoformat(),
'ns': 'forwarder.stomp',
'hostname': hostname,
'name': name
}
z = props.copy()
z.update(a)
eprint(json.dumps(z))
log_json('starting', dict([(property, value) for property, value in vars(options).iteritems()]))
stomp_address = args[0].split(":")
stomp = StompClient(stomp_address[0], int(stomp_address[1]))
cred = options.credentials.split(":")
stomp.connect(cred[0], cred[1], options.vhost)
def to_event(event):
try:
match = re.match(r'(\d\d\d\d-\d\d-\d\d[ T]\d\d:\d\d:\d\d[.,])(\d*)([+-]\d\d\d\d)?.*', event['line'])
if match:
timestamp = match.group(1) + match.group(2)[0:3]
timestamp += match.group(3) if match.group(3) else "+0000"
if timestamp[10] == ' ':
timestamp = timestamp[0:10] + 'T' + timestamp[11:]
else:
timestamp = "%s%s%02d00" % (event['time'][0:-3], "+" if time.altzone <= 0 else "-", time.altzone/60/60*-1)
except ValueError, e:
timestamp = event['time']
return {
"@timestamp": timestamp,
"host": options.hostname,
"message": event_line(event['line'].strip()),
}
def starts_with_space(line):
return len(line) > options.offset and line[options.offset] in [' ', '\t']
def event_line(line):
return line[options.offset:] if options.cut else line
def sending():
lastEvent = None
event = None
while event != exit_token:
count = 0;
payload = ''
while count < 1 and event != exit_token and (not q.empty() or count == 0):
try:
event = q.get(True, 1)
except Queue.Empty, e:
event = None
if event and event != exit_token and starts_with_space(event['line']):
lastEvent['line'] += "\n" + event_line(event['line'])
else:
if lastEvent:
payload += json.dumps(to_event(lastEvent))
count += 1
lastEvent = None
if not event:
break
lastEvent = event
if count > 0:
stomp.send(options.exchange, options.routing_key, payload)
t = threading.Thread(target=sending)
t.daemon = True
t.start()
try:
last_heartbeat = time.time();
while 1:
rlist, _, _ = select([sys.stdin], [], [], 0.1)
if rlist:
line = sys.stdin.readline()
if not line:
q.put(exit_token)
break
line = line[0:-1]
if len(line) > 0:
q.put({'time': datetime.datetime.now().isoformat(), 'line': line})
now = time.time();
if now - last_heartbeat >= options.heartbeat:
log_json('heartbeat')
last_heartbeat += options.heartbeat
t.join()
except KeyboardInterrupt, e:
pass