Skip to content
This repository has been archived by the owner on Aug 30, 2022. It is now read-only.

Commit

Permalink
Merge pull request #54 from ARGOeu/devel
Browse files Browse the repository at this point in the history
Version 0.1.10
  • Loading branch information
themiszamani authored Jun 19, 2018
2 parents dee1d6d + b219631 commit 1b6dbbf
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 7 deletions.
59 changes: 59 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ Currently, there are probes for:

- ARGO EGI Connectors
- ARGO Messaging service
- ARGO Messaging Nagios publisher
- ARGO Web API
- POEM service
- Compute Engine dataflow

## ARGO Messaging service

Expand Down Expand Up @@ -43,6 +45,36 @@ where:
$ ./ams-probe --token T0K3N --host messaging-devel.argo.grnet.gr --project EGI --topic probetest --subscription probetestsub --timeout 30
```

## ARGO Messaging Nagios publisher

Probe is inspecting AMS publisher running on Nagios monitoring instances. It's
inspecting trends of published messages for each spawned worker and raises alarm if
number of published messages of any worker is below expected threshold. It queries local
inspection socket that publisher exposes and reports back status with the help of NRPE
Nagios system.

The usage is:

```sh
usage: amspub_check.py [-h] -s SOCKET -q QUERY -c THRESHOLD [-t TIMEOUT]
```

where:
- (-s): local path of publisher inspection socket
- (-q): simple query that can be specified multiple times consisted of worker name and identifier of published or consumed
messages in specified minute interval, e.g. `w:metrics+g:published15`
- `metrics` is name of worker that will be inspected
- `published15` is identifier designating that caller is interested in number of
published messages in last 15 minutes
- (-c): threshold corresponding to each query
- (-t): optional timeout after which probe will no longer wait for answer from socket

### Usage example

```sh
./ams-publisher-probe -s /var/run/argo-nagios-ams-publisher/sock -q 'w:metrics+g:published180' -c 50000 -q 'w:alarms+g:published180' -c 1
```

## ARGO Web API

This is a probe for checking AR and status reports are properly working.
Expand Down Expand Up @@ -70,3 +102,30 @@ where:
```sh
$ ./web-api -H web-api.test.com --tenant tenantname --rtype ar --token 12321312313123 --unused-reports Report1 Report2 --day 1 -t 180 -v
```

## Compute Engine dataflow

This is a probe for checking the compute engine's dataflow, making sure that all components work as intented.
The checking involves the probe publishing a message to AMS, and expecting after some time, to find the same message produced by the system.If the message is identical, and has been delivered in reasonable time, then everything is ok, otherwise, we examine the result, to figure out, what went wrong with the system.

Usage of the script:
```sh
$ ce_check.py [-h] [-H HOSTNAME] [--project Project] [--token TOKEN]
[--push_topic Push Topic] [--pull_subscription Pull Subscription] [-t TIMEOUT]

```
- (-H): the hostname of the AMS endpoint.
- (--project): the project that holds the topics and subscriptions.
- (--token): the authorization token.
- (--push_topic): the name of the topic, where the probe should publish its data.
- (--pull_subscription): the name of the subscription, where the probe will check for system's response.
- (--push_subscription): the name of the subscription, where the System will read from.
- (-t): A timeout option(seconds) for AMS library requests.
- (-i): a timewindow(seconds) between publishing and retrieving the message that is expected and considered 'healthy' for the system.

### Usage example

```sh
$ ce_check -H ams-endpoint.gr --project TEST_PR --token test_token --push_topic test_topic --pull_subscription test_sub --push_subscription test_sub_2 -t 180 -i 500

```
10 changes: 7 additions & 3 deletions modules/NagiosResponse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
class NagiosResponse:
class NagiosResponse(object):
_msgBagWarning = []
_msgBagCritical = []
_msgBagOk = []
_okMsg = ""
_code = None

Expand All @@ -13,10 +14,12 @@ def __init__(self, ok_msg=""):
self._code = self.OK
self._okMsg = ok_msg


def writeWarningMessage(self, msg):
self._msgBagWarning.append(msg)

def writeOkMessage(self, msg):
self._msgBagOk.append(msg)

def writeCriticalMessage(self, msg):
self._msgBagCritical.append(msg)

Expand All @@ -32,7 +35,8 @@ def getMsg(self):
elif self._code == self.CRITICAL:
return "CRITICAL - " + self._toString(self._msgBagCritical)
elif self._code == self.OK:
return "OK - " + self._okMsg if self._okMsg else "OK"
msg = self._okMsg if self._okMsg else self._toString(self._msgBagOk)
return "OK - " + msg
else:
return "UNKNOWN!"

Expand Down
109 changes: 109 additions & 0 deletions modules/amspub_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
#!/usr/bin/env python

import argparse
import socket
from nagios_plugins_argo.NagiosResponse import NagiosResponse

maxcmdlength = 128
timeout = 10

def parse_result(query):
try:
w, r = query.split('+')

w = w.split(':')[1]
r = int(r.split(':')[1])

except (ValueError, KeyError):
return (w, 'error')

return (w, r)


def main():
parser = argparse.ArgumentParser()
parser.add_argument('-s', dest='socket', required=True, type=str, help='AMS inspection socket')
parser.add_argument('-q', dest='query', action='append', required=True, type=str, help='Query')
parser.add_argument('-c', dest='threshold', action='append', required=True, type=int, help='Threshold')
parser.add_argument('-t', dest='timeout', required=False, type=int, help='Timeout')
arguments = parser.parse_args()

nr = NagiosResponse()

if len(arguments.threshold) != len(arguments.query):
nr.setCode(2)
nr.writeCriticalMessage('Wrong arguments')
print nr.getMsg()
raise SystemExit(nr.getCode())

if arguments.timeout:
timeo = arguments.timeout
else:
timeo = timeout

try:
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.setblocking(0)
sock.settimeout(timeo)

sock.connect(arguments.socket)
sock.send(' '.join(arguments.query), maxcmdlength)
data = sock.recv(maxcmdlength)

lr = list()
for r in data.split():
lr.append(parse_result(r))

error = False
for e in lr:
if e[1] == 'error':
nr.setCode(2)
nr.writeCriticalMessage('Worker {0} {1}'.format(e[0], e[1]))
error = True
if error:
print nr.getMsg()
raise SystemExit(nr.getCode())

error = False
nr.setCode(0)
i = 0
while i < len(lr):
e = lr[i]
if e[1] < arguments.threshold[i]:
nr.setCode(2)
nr.writeCriticalMessage('Worker {0} published {1} (threshold {2})'.format(e[0], e[1], arguments.threshold[i]))
error = True
i+=1

if error:
print nr.getMsg()
raise SystemExit(nr.getCode())
else:
i = 0
nr.setCode(0)
while i < len(lr):
e = lr[i]
nr.writeOkMessage('Worker {0} published {1} (threshold {2})'.format(e[0], e[1], arguments.threshold[i]))
i+=1

print nr.getMsg()
raise SystemExit(nr.getCode())


except socket.timeout as e:
nr.setCode(2)
nr.writeCriticalMessage('Socket response timeout after {0}s'.format(timeo))
print nr.getMsg()
raise SystemExit(nr.getCode())

except socket.error as e:
nr.setCode(2)
nr.writeCriticalMessage('Socket error: {0}'.format(str(e)))
print nr.getMsg()
raise SystemExit(nr.getCode())

finally:
sock.close()

if __name__ == "__main__":
main()
87 changes: 87 additions & 0 deletions modules/ce_check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
#!/usr/bin/env python

from argparse import ArgumentParser
import datetime
import time
import json
from argo_ams_library import ArgoMessagingService, AmsException
from NagiosResponse import NagiosResponse


def main():
TIMEOUT = 180
INTERVAL = 300

parser = ArgumentParser(description="Nagios probe for monitoring the compute engine's flow.")
parser.add_argument('-H', dest='host', type=str, default='msg-devel.argo.grnet.gr', help='FQDN of AMS Service')
parser.add_argument('--token', type=str, required=True, help='Given token')
parser.add_argument('--project', type=str, required=True, help='Project registered in AMS Service')
parser.add_argument('--push_topic', type=str, default='create_data', help='Given topic')
parser.add_argument('--push_subscription', type=str, default='create_data_sub', help='Push_Subscription name')
parser.add_argument('--pull_subscription', type=str, default='retrieve_data_sub', help='Push_Subscription name')
parser.add_argument('-t', dest='timeout', type=int, default=TIMEOUT, help='Timeout for ams calls')
parser.add_argument('-i', dest='interval', type=int, default=INTERVAL, help='The amount of time the probe should try to read from ams, beforing exiting')

cmd_options = parser.parse_args()

run_timestamp = str(datetime.datetime.now())

nagios = NagiosResponse("System Dataflow at " + run_timestamp + " completed successfully.")
ams = ArgoMessagingService(endpoint=cmd_options.host, token=cmd_options.token, project=cmd_options.project)
try:
# For both subscriptions move their offset to max
move_sub_offset_to_max(ams, cmd_options.push_subscription, timeout=cmd_options.timeout)
move_sub_offset_to_max(ams, cmd_options.pull_subscription, timeout=cmd_options.timeout)

# publish a message with the current timestamp as its content
req_data = {'message': run_timestamp, 'errors': []}
d1 = {'data': json.dumps(req_data), 'attributes': {}}
ams.publish(cmd_options.push_topic, d1, timeout=cmd_options.timeout)
start = time.time()
no_resp = True
while no_resp:
end = time.time()
# check if the systsem has written to the retrieve topic
resp = ams.pull_sub(cmd_options.pull_subscription, timeout=cmd_options.timeout)
if len(resp) > 0:
no_resp = False
resp_data = json.loads(resp[0][1]._data)
# check if the submitted and retrieved data differ
if req_data != resp_data:
nagios_report(nagios, 'critical', "System Dataflow at " + run_timestamp + " completed with errors. Expected: " + str(req_data) + ". Found: " + str(resp_data)+".")
# check if data was retrieved within the expected timeout period, BUT had some kind of delay
elif req_data == resp_data and end-start > cmd_options.interval:
nagios_report(nagios, 'warning', "System Dataflow at " + run_timestamp + " completed successfully using an extra time of: " + str((end-start)-cmd_options.interval) + "s.")

if (end-start) > 2 * cmd_options.interval:
nagios_report(nagios, 'critical', "System Dataflow at " + run_timestamp + " returned with no message from the systsem after " + str(2 * cmd_options.interval) + "s.")

# check for a response every 10 seconds
time.sleep(10)

print(nagios.getMsg())
raise SystemExit(nagios.getCode())

except AmsException as e:
nagios_report(nagios, 'critical', e.msg)


def nagios_report(nagios, status, msg):
nagios_method = getattr(nagios, "write{0}Message".format(status.capitalize()))
nagios_method(msg)
nagios_status = getattr(nagios, status.upper())
nagios.setCode(nagios_status)
if status == 'critical':
print(nagios.getMsg())
raise SystemExit(nagios.getCode())


def move_sub_offset_to_max(ams, sub, **reqkwargs):
# Retrieve the max offset for the given subscription
max_sub_offset = ams.getoffsets_sub(sub, "max", **reqkwargs)
# Move the current offset to the max position
ams.modifyoffset_sub(sub, max_sub_offset, **reqkwargs)


if __name__ == "__main__":
main()
20 changes: 18 additions & 2 deletions modules/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
weights_state = 'weights-ok'

def check_file_ok(fname):
if os.path.isfile(fname):
if os.stat(fname) and os.path.isfile(fname):
fh = open(fname, 'r')
if fh.read().strip() == 'True':
return True
Expand Down Expand Up @@ -87,7 +87,23 @@ def main():
date_sufix.append(day.strftime("%Y_%m_%d"))

nagios = NagiosResponse("All connectors are working fine.")
process_customer(cmd_options, root_directory, date_sufix, nagios)
try:
process_customer(cmd_options, root_directory, date_sufix, nagios)

except OSError as e:
nagios.setCode(nagios.CRITICAL)
if getattr(e, 'filename', False):
nagios.writeCriticalMessage('{0} {1}'.format(repr(e), e.filename))
else:
nagios.writeCriticalMessage(repr(e))
print nagios.getMsg()
raise SystemExit(nagios.getCode())

except Exception as e:
nagios.setCode(nagios.CRITICAL)
nagios.writeCriticalMessage(repr(e))
print nagios.getMsg()
raise SystemExit(nagios.getCode())

print(nagios.getMsg())
raise SystemExit(nagios.getCode())
Expand Down
4 changes: 3 additions & 1 deletion nagios-plugins-argo.spec
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Name: nagios-plugins-argo
Summary: ARGO components related probes.
Version: 0.1.8
Version: 0.1.10
Release: 1%{?dist}
License: ASL 2.0
Source0: %{name}-%{version}.tar.gz
Expand Down Expand Up @@ -44,6 +44,8 @@ rm -rf %{buildroot}


%changelog
* Tue Mar 27 2018 Daniel Vrcic <[email protected]> - 0.1.9-1%{?dist}
- added argo-nagios-ams-publisher
* Mon Dec 4 2017 Daniel Vrcic <[email protected]> - 0.1.8-1%{?dist}
- connectors-probe warning logic revised
- connectors-probe updated global.conf parsing
Expand Down
5 changes: 5 additions & 0 deletions src/ams-publisher-probe
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#!/usr/bin/env python

from nagios_plugins_argo import amspub_check

amspub_check.main()
6 changes: 6 additions & 0 deletions src/ce_check
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/usr/bin/env python

from nagios_plugins_argo import ce_check

ce_check.main()

2 changes: 1 addition & 1 deletion src/check_nagios
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ foreach my $service (@services) {
$state = CRITICAL;
last;
} else {
my $diff = ($service->{last_update} - $service->{last_check})/1000;
my $diff = time() - $service->{last_check}/1000;
if ($diff > $plugin->opts->age) {
$answer .= "Service $host/".$service->{description}." on Nagios $nagios is older than ".$plugin->opts->age." seconds, last check was $diff seconds ago.";
$state = CRITICAL;
Expand Down

0 comments on commit 1b6dbbf

Please sign in to comment.