forked from Attumm/meesee
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmeesee.py
129 lines (104 loc) · 4.12 KB
/
meesee.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import sys
import time
import json
import traceback
import redis
from multiprocessing import Pool
config = {
"namespace": "removeme",
"key": "tasks",
"redis_config": {},
"maxsize": 1000
}
class RedisQueue:
def __init__(self, namespace, key, redis_config, maxsize=None, timeout=None):
self.r = redis.Redis(**redis_config)
self.key = key
self.namespace = namespace
self.maxsize = maxsize
self.timeout = timeout
self.list_key = self.format_list_key(namespace, key)
def format_list_key(self, key, namespace):
return '{}:{}'.format(namespace, key)
def set_list_key(self, key=None, namespace=None):
if key is not None:
self.key = key
if namespace is not None:
self.namespace = namespace
self.list_key = self.format_list_key(self.namespace, self.key)
def first_inline_send(self, item):
# TODO rename method
self.r.lpush(self.list_key, item)
def send_to(self, key, item):
self.r.rpush('{}:{}'.format(self.namespace, key), item)
def send(self, item):
"""Adds item to the end of the Redis List.
Side-effects:
If size is above max size, the operation will keep the size the same.
Note that if does not resize the list to maxsize.
"""
if self.maxsize is not None and self.r.llen(self.list_key) >= self.maxsize:
self.r.lpop(self.list_key)
self.r.rpush(self.list_key, item)
def send_dict(self, item):
self.send(json.dumps(item))
def __iter__(self):
return self
def __next__(self):
result = self.r.blpop(self.list_key, self.timeout)
if result is None:
raise StopIteration
return result
class InitFail(Exception):
pass
def init_add(func_kwargs, init_items, init_kwargs):
try:
for name, config in init_kwargs.items():
func_kwargs[name] = init_items[name](**config)
except TypeError as e:
raise InitFail from e
return func_kwargs
def setup_init_items(func_kwargs, init_kwargs):
return {name: func_kwargs[name] for name in init_kwargs.keys()}
def run_worker(func, func_kwargs, on_failure_func, config, worker_id, init_kwargs):
if isinstance(func, list):
func = func[worker_id%len(func)]
if isinstance(config, list):
config = config[worker_id%len(config)]
item, r = None, None
init_items = setup_init_items(func_kwargs, init_kwargs)
while True:
try:
func_kwargs = init_add(func_kwargs, init_items, init_kwargs)
r = RedisQueue(**config) # TODO rename r
sys.stdout.write('worker {worker_id} started\n'.format(worker_id=worker_id))
for key_name, item in r:
func(item.decode('utf-8'), worker_id, **func_kwargs)
except InitFail:
sys.stdout.write('worker {worker_id} initialization failed\n'.format(worker_id=worker_id))
traceback.print_exc()
break
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('worker {worker_id} stopped\n'.format(worker_id=worker_id))
r.first_inline_send(item)
break
except Exception as e:
sys.stdout.write('worker {worker_id} failed reason {e}\n'.format(worker_id=worker_id, e=e))
if on_failure_func is not None:
sys.stdout.write('worker {worker_id} running failure handler {e}\n'.format(worker_id=worker_id, e=e))
on_failure_func(item, e, r, worker_id)
time.sleep(0.1) # Throttle restarting
if config.get('timeout') is not None:
break
def startapp(func, func_kwargs={}, workers=10, config=config, on_failure_func=None, init_kwargs={}):
p = Pool(workers)
args = ((func, func_kwargs, on_failure_func, config, worker_id, init_kwargs) for worker_id in
range(1, workers + 1))
try:
p.starmap(run_worker, args)
except (KeyboardInterrupt, SystemExit):
sys.stdout.write('Starting Graceful exit\n')
p.close()
p.join()
finally:
sys.stdout.write('Clean shut down\n')