Skip to content

Commit

Permalink
feat(kafka): avoid try block to use the default value "" of kafka vars
Browse files Browse the repository at this point in the history
  • Loading branch information
Smejky338 authored and jhutar committed Dec 22, 2023
1 parent 27494df commit 691baea
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 9 deletions.
8 changes: 3 additions & 5 deletions opl/consumer_lag.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,20 @@ def _getconsumer(self):
}

# Kafka consumer creation: SASL or noauth
try:
if self.username != "" and self.password != "":
logging.info(
f"Creating SASL password-protected Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms"
)
if self.username == "" or self.password == "":
raise ValueError("Password or username not provided!")
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.username,
"sasl_plain_password": self.password,
}
consumer = KafkaConsumer(**common_params, **sasl_params)
except (ValueError, AttributeError) as e:
else:
logging.info(
f"Creating passwordless Kafka consumer for {self.bootstrap_servers} in group {self.group_id} with timeout {common_params['session_timeout_ms']} ms"
f"Creating passwordless Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms"
)
consumer = KafkaConsumer(**common_params)
return consumer
Expand Down
4 changes: 2 additions & 2 deletions opl/hbi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def gen_send_verify(args, status_data):
if args.dry_run:
producer = None
else:
try:
if args.kafka_username != "" and args.kafka_password != "":
logging.info(
f"Creating SASL password-protected producer to {args.kafka_host}"
)
Expand All @@ -200,7 +200,7 @@ def gen_send_verify(args, status_data):
sasl_plain_username=args.kafka_username,
sasl_plain_password=args.kafka_password,
)
except AttributeError:
else:
logging.info(f"Creating passwordless producer to {args.kafka_host}")
producer = kafka.KafkaProducer(
bootstrap_servers=kafka_host,
Expand Down
4 changes: 2 additions & 2 deletions opl/post_kafka_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def produce_thread(args, config, produce_here, save_here):
"compression_type": args.compression_type,
}

try:
if args.kafka_username != "" and args.kafka_password != "":
logging.info(
f"Creating SASL password-protected producer to {args.kafka_host}"
)
Expand All @@ -352,7 +352,7 @@ def produce_thread(args, config, produce_here, save_here):
"sasl_plain_password": args.kafka_password,
}
produce_here = KafkaProducer(**common_params, **sasl_params)
except AttributeError:
else:
logging.info(
f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}"
)
Expand Down

0 comments on commit 691baea

Please sign in to comment.