Skip to content

Commit

Permalink
refactor(kafka consumers): Switch to common params hash and put consu…
Browse files Browse the repository at this point in the history
…mer code into separate method, use default kafka username = ""
  • Loading branch information
Smejky338 authored and jhutar committed Dec 22, 2023
1 parent 691baea commit 0b0442f
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 55 deletions.
49 changes: 21 additions & 28 deletions opl/get_kafka_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,38 +85,31 @@ def create_consumer(self):
self.status_data.set("parameters.kafka.topic", self.kafka_topic)
self.status_data.set("parameters.kafka.timeout", self.kafka_timeout)

try:
common_params = {
"bootstrap_servers": self.kafka_host,
"auto_offset_reset": "earliest",
"enable_auto_commit": True,
"group_id": self.kafka_group,
"max_poll_records": self.kafka_max_poll_records,
"session_timeout_ms": 50000,
"heartbeat_interval_ms": 10000,
"consumer_timeout_ms": self.kafka_timeout,
}

if self.kafka_username != "" and self.kafka_password != "":
logging.info(
f"Creating consumer with sasl username&pasword to {self.kafka_host}"
)
consumer = KafkaConsumer(
self.kafka_topic,
bootstrap_servers=self.kafka_host, # self.kafka_hosts,
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id=self.kafka_group,
max_poll_records=self.kafka_max_poll_records,
session_timeout_ms=50000,
heartbeat_interval_ms=10000,
consumer_timeout_ms=self.kafka_timeout,
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username=self.kafka_username,
sasl_plain_password=self.kafka_password,
)
except AttributeError:
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.kafka_username,
"sasl_plain_password": self.kafka_password,
}
consumer = KafkaConsumer([self.kafka_topic], **common_params, **sasl_params)
else:
logging.info(f"Creating passwordless consumer to {self.kafka_host}")
consumer = KafkaConsumer(
self.kafka_topic,
bootstrap_servers=self.kafka_host,
auto_offset_reset="earliest",
enable_auto_commit=True,
group_id=self.kafka_group,
max_poll_records=self.kafka_max_poll_records,
session_timeout_ms=50000,
heartbeat_interval_ms=10000,
consumer_timeout_ms=self.kafka_timeout,
)
consumer = KafkaConsumer([self.kafka_topic], **common_params)
return consumer

def store_now(self):
Expand Down
51 changes: 24 additions & 27 deletions opl/skip_to_end.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,46 +11,49 @@
from . import skelet


def doit_seek_to_end(kafka_hosts, kafka_timeout, kafka_topic, kafka_group, args=""):
"""
Create consumer and seek to end
This seek to end is important so we are not wasting our time processing
all the messages in the Kafka log for given topic. If we would have same
and static group name, we would have problems when running concurrently
on multiple pods.
"""

def create_consumer(args):
# Common parameters for both cases
common_params = {
"bootstrap_servers": kafka_hosts,
"bootstrap_servers": args.kafka_hosts,
"auto_offset_reset": "latest",
"enable_auto_commit": True,
"group_id": kafka_group,
"group_id": args.kafka_group,
"session_timeout_ms": 50000,
"heartbeat_interval_ms": 10000,
"consumer_timeout_ms": kafka_timeout,
"consumer_timeout_ms": args.kafka_timeout,
}

# Kafka consumer creation: SASL or noauth
try:
if args.kafka_username != "" and args.kafka_password != "":
logging.info(
f"Creating SASL password-protected Kafka consumer for {kafka_hosts} in group {kafka_group} with timeout {kafka_timeout} ms topic {kafka_topic}"
f"Creating SASL password-protected Kafka consumer for {args.kafka_hosts} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": args.kafka_username,
"sasl_plain_password": args.kafka_password,
}
consumer = KafkaConsumer(**common_params, **sasl_params)
except AttributeError:
consumer = KafkaConsumer([args.kafka_topic], **common_params, **sasl_params)
else:
logging.info(
f"Creating passwordless producer for for {kafka_hosts} in group {kafka_group} with timeout {kafka_timeout} ms topic {kafka_topic}"
f"Creating passwordless producer for for {args.kafka_hosts} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}"
)
consumer = KafkaConsumer(**common_params)
consumer = KafkaConsumer([args.kafka_topic], **common_params)
return consumer

consumer.subscribe(kafka_topic)

def doit_seek_to_end(args):
"""
Create consumer and seek to end
This seek to end is important so we are not wasting our time processing
all the messages in the Kafka log for given topic. If we would have same
and static group name, we would have problems when running concurrently
on multiple pods.
"""

consumer = create_consumer(args)

# Seek to end
for attempt in range(10):
Expand All @@ -71,13 +74,7 @@ def doit_seek_to_end(kafka_hosts, kafka_timeout, kafka_topic, kafka_group, args=


def doit(args, status_data):
doit_seek_to_end(
[f"{args.kafka_host}:{args.kafka_port}"],
args.kafka_timeout,
args.kafka_topic,
args.kafka_group,
args,
)
doit_seek_to_end(args)

status_data.set("parameters.kafka.seek_topic", args.kafka_topic)
status_data.set("parameters.kafka.seek_timeout", args.kafka_timeout)
Expand Down

0 comments on commit 0b0442f

Please sign in to comment.