-
Notifications
You must be signed in to change notification settings - Fork 1
/
thyme_worker
executable file
·84 lines (79 loc) · 2.67 KB
/
thyme_worker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#!/usr/bin/env python2
import pika, logging
from subprocess import call
from os import chmod,remove,chdir
from time import sleep,time
from argparse import ArgumentParser
backoff = 1
script = "job.sh"
# Logging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s %(levelname)s:%(name)s: %(message)s")
logging.getLogger("pika").setLevel(logging.WARNING)
log = logging.getLogger(__name__)
# Command line parameters
argp = ArgumentParser(description="Carrot worker. Use RabbitMQ to execute jobs.")
argp.add_argument("--rabbitmq", dest="rabbitmq")
argp.add_argument("--workdir", "-w", dest="workdir", default="/")
argp.add_argument("--max-backoff", "-m", dest="max_backoff", default=16, type=int)
argp.add_argument("--timeout", "-t", dest="timeout", default=120, type=int)
argp.add_argument("--queue", "-q", dest="queue")
args = argp.parse_args()
if not args.queue:
argp.error("Please specify a RabbitMQ queue explicitly with --queue/-q")
if not args.rabbitmq:
argp.error("Please specify a RabbitMQ host:port explicitly with --rabbitmq")
else:
rmq_host = args.rabbitmq
rmq_port = 5672
if ":" in args.rabbitmq:
rmq_host,rmq_port = args.rabbitmq.split(":", 1)
try:
rmq_port = int(rmq_port)
except ValueError as e:
argp.error("Malformed port: %s" % rmq_port)
timeout_at = time()+args.timeout
log.info("Connecting to RabbitMQ at %s:%d (queue: %s) and using workdir %s" % \
(rmq_host, rmq_port, args.queue, args.workdir))
log.info("Maximum backoff: %d s, timeout: %d s" % (args.max_backoff, args.timeout))
rmq = None
while True:
try:
chdir(args.workdir)
if not rmq:
rmq = pika.BlockingConnection(pika.ConnectionParameters(rmq_host, rmq_port))
ch = rmq.channel()
log.info("RabbitMQ connection OK")
method,header,body = ch.basic_get(args.queue, no_ack=False)
if method:
log.info("Job received")
try:
env,body = body.split("\n", 1) # first line is env
with open(script, "w") as f:
f.write(body)
chmod(script, 0700)
ch.basic_ack(method.delivery_tag)
try:
rmq.close()
except Exception:
pass
call(["env"] + env.split(";") + ["./"+script])
remove(script)
except OSError as e:
log.warning("Cannot execute jobs: %s" % e)
break
else:
log.info("Nothing received")
except Exception as e:
log.debug("Connection problem (%s): %s" % (e.__class__.__name__, e))
rmq = None
if time() < timeout_at:
log.debug("Retry in %d s" % backoff)
sleep(backoff)
backoff = min(2*backoff, args.max_backoff)
else:
log.error("Nothing done in due time, exiting")
break
try:
rmq.close()
except Exception as e:
pass