-
Notifications
You must be signed in to change notification settings - Fork 0
/
run.py
107 lines (89 loc) · 3.37 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import optparse
import os
import asyncio
import json
import time
import logging
import uuid
import multiprocessing as mp
import uvloop
from aiokafka import AIOKafkaConsumer
from kafka.errors import KafkaConnectionError, CommitFailedError
from configs.local import local_config
from configs.prod import prod_config
from src.utils.errors import main_thread_exception_handler
from src.utils.workers import WorkerThread
# we do not need crazy amount of worker threads because each thread
# run tasks concurrently note that this approach only take benefit from
# i/o bound tasks
WORKER_COUNT = (os.cpu_count() / 2) - 1 if all([os.cpu_count(), os.cpu_count() > 8]) else 3
logger = logging.getLogger(__name__)
CONFIG_MAPPING = {
"local": local_config,
"prod": prod_config
}
async def main(server_settings):
loop = asyncio.get_event_loop()
loop.set_exception_handler(main_thread_exception_handler)
threads = []
# TODO monitor and consider manual partition assignment
consumer = AIOKafkaConsumer(
server_settings["topic_name"],
bootstrap_servers=server_settings["kafka_host"],
loop=loop,
client_id=mp.current_process().name,
enable_auto_commit=False,
max_poll_interval_ms=600000,
heartbeat_interval_ms=6000,
max_poll_records=1000,
auto_offset_reset='earliest',
group_id=server_settings["group_id"])
for thread_index in range(WORKER_COUNT):
worker_thread = WorkerThread("WorkerThread-{}".format(thread_index), loop, consumer)
worker_thread.daemon = True
worker_thread.start()
while not getattr(worker_thread, "queue"):
print("waiting for queues to initialize")
time.sleep(0.1)
threads.append(worker_thread)
await consumer.start()
async for msg in consumer:
try:
message = json.loads(msg.value)
message['id'] = uuid.uuid4().hex
# least task scheduled worker
thread = next(iter(sorted(threads, key=lambda t: t.task_count)))
asyncio.run_coroutine_threadsafe(thread.queue.put(message), loop=thread.loop)
await consumer.commit()
except (KafkaConnectionError, CommitFailedError) as e:
logger.exception("Connection error <{}>".format(str(e)))
except Exception as e:
logger.exception("Failed while putting message to queue <{}>".format(str(e)))
await consumer.stop()
parser = optparse.OptionParser()
parser.add_option("--config", default="local")
options, _ = parser.parse_args()
settings = CONFIG_MAPPING[options.config]
def init_process(settings):
# initialize event loop with uvloop event policy
# and start consuming process and worker threads
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
asyncio.run(main(settings))
if __name__ == '__main__':
processes = []
# One process for each partition and also this main process
partitions_count = settings["partitions_count"]
for _ in range(partitions_count):
process = mp.Process(
target=init_process,
args=[settings],
name="ConsumerProcess-{}".format(_)
)
process.daemon = True
process.start()
processes.append(process)
for process in processes:
process.join()
for process in processes:
process.terminate()
# TODO make useful of this mainprocess