-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconsumer_benchmark.py
60 lines (51 loc) · 2.02 KB
/
consumer_benchmark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import logging
import uuid
import dill
import pandas as pd
from confluent_kafka import Consumer
from kafka_wrapper import Worker
from constants import BOOTSTRAP_SERVER, TOPIC
# Set up logging
formatter = logging.Formatter(
fmt="[%(asctime)s][%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kafka_wrapper.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
def deserializer(serialized):
"""Example deserializer function with extra sanity checking."""
assert isinstance(serialized, bytes), "Expecting a bytes"
return dill.loads(serialized)
def consumer_benchmark():
consumer_timings = {}
consumer_timings['confluent_python_kafka_consumer'] = python_kafka_consumer_performance()
calculate_thoughput(consumer_timings['confluent_python_kafka_consumer'])
return consumer_timings
def calculate_thoughput(timing, n_messages=1000000, msg_size=100):
print("Processed {0} messsages in {1:.2f} seconds".format(n_messages, timing))
print("{0:.2f} MB/s".format((msg_size * n_messages) / timing / (1024*1024)))
print("{0:.2f} Msgs/s".format(n_messages / timing))
def python_kafka_consumer_performance():
msg_count = 1000000
conf = {'bootstrap.servers': BOOTSTRAP_SERVER,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
}
consumer = Consumer(**conf)
worker = Worker(
topic=TOPIC, consumer=consumer, config=conf, deserializer=deserializer
)
consumer_timing = worker.start_process(max_messages=msg_count, test=True)
return consumer_timing
if __name__ == "__main__":
consumer_timings = consumer_benchmark()
consumer_df = (pd.DataFrame
.from_dict(consumer_timings, orient='index')
.rename(columns={0: 'time_in_seconds'}))
print("consumer_performance")
print(consumer_df)