-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfluent_consumer_3.py
36 lines (32 loc) · 1.1 KB
/
confluent_consumer_3.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import logging
import dill
from confluent_kafka import Consumer
from kafka_wrapper import Worker
from constants import BOOTSTRAP_SERVER, GROUP_ID, TOPIC
# Set up logging
formatter = logging.Formatter(
fmt="[%(asctime)s][%(levelname)s] %(message)s", datefmt="%Y-%m-%d %H:%M:%S"
)
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger = logging.getLogger("kafka_wrapper.worker")
logger.setLevel(logging.DEBUG)
logger.addHandler(stream_handler)
def deserializer(serialized):
"""Example deserializer function with extra sanity checking."""
assert isinstance(serialized, bytes), "Expecting a bytes"
return dill.loads(serialized)
if __name__ == "__main__":
conf = {'bootstrap.servers': BOOTSTRAP_SERVER,
'group.id': GROUP_ID,
'session.timeout.ms': 6000,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}
}
consumer = Consumer(**conf)
worker = Worker(
topic=TOPIC, consumer=consumer, config=conf, deserializer=deserializer
)
page_views = {}
worker.start_process(page_views)