Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor(Kafka): Extract and unify KafkaConsumer & KafkaProducer constructors, feat: use multiple brokers to disperse load with arg --kafka-hosts #121

Merged
merged 34 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6ef513b
fix(skip_to_end): Supply server port as well
Smejky338 Jan 12, 2024
971f96a
refactor(post_kafka): Unify Kafka_host assignment to before common_pa…
Smejky338 Jan 12, 2024
f3bba4b
fix(get_kafka): Omit list bracets in non-pointer supplying to Kafka
Smejky338 Jan 12, 2024
dd13bf7
feat(skelet): Add INFO output option
Smejky338 Jan 12, 2024
39f9234
lint(skip_to_end)
Smejky338 Jan 12, 2024
be32c45
fix(post_kafka): Give back kafka_ prefix
Smejky338 Jan 12, 2024
9708068
feat(skelet setup): Rename --info to --verbose to be unified with others
Smejky338 Jan 12, 2024
0a35464
fix(skip_end): Omit explicit logging level logic, use what's in skele…
Smejky338 Jan 12, 2024
3e2ce80
feat(HBI): Use retries and timeout from args in KafkaProducer
Smejky338 Feb 13, 2024
8f0b418
Merge remote-tracking branch 'upstream/main' into msk
Smejky338 Feb 13, 2024
c19beda
refactor(Kafka): Extract and unify KafkaConsumer & KafkaProducer cons…
Smejky338 Feb 14, 2024
a6fde9d
Merge remote-tracking branch 'upstream/main' into refactor-kafka
Smejky338 Feb 14, 2024
5420893
refactor(postKafkaTimes): Use kafka_topic from args, don't duplicate it
Smejky338 Feb 14, 2024
13402fd
docs(kafka_init): Add basic function descriptions
Smejky338 Feb 14, 2024
b9d396b
fix(skiptoend): Remove param status_data in call as it isn't impleme…
Smejky338 Feb 15, 2024
52e80ef
feat(opl args): Improve kafka address arguments to clarify priority o…
Smejky338 Feb 15, 2024
f3e5ddc
Merge remote-tracking branch 'upstream/main' into refactor-kafka
Smejky338 Feb 15, 2024
343b185
fix(kafka producer init): Handle missing args.dry_run without throwin…
Smejky338 Feb 15, 2024
6157a06
fix(kafka init Consumer): Fix bootstrap server path
Smejky338 Feb 16, 2024
595e6e5
fix(kafka_init bootstrap address): default return to the host:port va…
Smejky338 Feb 16, 2024
f957e66
fix(consumer_lag): Missing self in constructor args
Smejky338 Feb 16, 2024
3b6c2e5
lint fixes
Smejky338 Feb 16, 2024
ce08590
refactor(kafka args for init): unify args naming for kafka to prefix …
Smejky338 Feb 16, 2024
f25d937
fix(get_kafka_times): Add missing `kafka_max_poll_records` as 100
Smejky338 Feb 16, 2024
bd4648e
lint fix kafka_init : comment with single #
Smejky338 Feb 16, 2024
4478fbf
fix lint blank line
Smejky338 Feb 16, 2024
7948848
fix(kafka_init): Default to a value without throwin an exception
Smejky338 Feb 19, 2024
790d815
fix(Kafka init): fix username and password check path and loggging ar…
Smejky338 Feb 19, 2024
4d0eb40
fix(Kafka init): obtain logging params from sanitized initialization …
Smejky338 Feb 19, 2024
ea0f217
feat(kafka_init): Wrap in same-named class for outside use
Smejky338 Feb 19, 2024
9a32217
fix(msk): Fix path to kafka_init due to added class
Smejky338 Feb 19, 2024
afc51ae
fix(kafka_init): Fix reference to internal class method, make it static
Smejky338 Feb 20, 2024
53c32d5
feat(kafka init): make producer&consumer getters static
Smejky338 Feb 20, 2024
5c5fa3c
lint fix: Remove leading newline, fix import
Smejky338 Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions opl/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,12 @@ def add_kafka_opts(parser):
parser.add_argument(
"--kafka-host",
default=os.getenv("KAFKA_HOST", "localhost"),
help="Kafka host (also use env variable KAFKA_HOST)",
help="Kafka host (also use env variable KAFKA_HOST). Can get overriden by --kafka-hosts arg or KAFKA_HOSTS envvar.",
)
parser.add_argument(
"--kafka-hosts",
default=os.getenv("KAFKA_HOSTS", ""),
help="Comma-separated list of hosts, including their ports (also use env variable KAFKA_HOSTS). Takes precedence over --kafka-host and --kafka-port or their envvar variants.",
)
parser.add_argument(
"--kafka-port",
Expand Down Expand Up @@ -131,7 +136,7 @@ def add_kafka_opts(parser):
help="The client is going to wait this much time for the server to respond to a request (also use env variable KAFKA_REQUEST_TIMEOUT_MS)",
)
parser.add_argument(
"--kafka-max_block_ms",
"--kafka-max-block-ms",
type=int,
default=int(os.getenv("KAFKA_MAX_BLOCK_MS", 60000)),
help="Max time to block send e.g. because buffer is full (also use env variable KAFKA_MAX_BLOCK_MS)",
Expand Down
57 changes: 14 additions & 43 deletions opl/consumer_lag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import logging
import time
from kafka import KafkaConsumer
from kafka import TopicPartition
from opl.kafka_init import kafka_init


class ConsumerLag:
Expand All @@ -12,65 +12,36 @@ class ConsumerLag:
bootstrap_server and kafka group as input.
"""

def __init__(
self, topic, bootstrap_servers, group, username="", password=""
) -> None:
self.topic = topic
self.group = group
self.bootstrap_servers = bootstrap_servers
def __init__(self, args, kafka_topic) -> None:
self.args = args
self.args.kafka_topic = kafka_topic
self.logger = logging.getLogger("consumer_lag")
self.offset_records = {}
self.username = username
self.password = password

def _getconsumer(self):
# Common parameters for both cases
common_params = {
"bootstrap_servers": self.bootstrap_servers,
"auto_offset_reset": "latest",
"enable_auto_commit": False,
"max_poll_records": 50,
"max_poll_interval_ms": 300000,
"group_id": self.group,
"session_timeout_ms": 50000,
"heartbeat_interval_ms": 10000,
"consumer_timeout_ms": 100000,
}
self.args.kafka_max_poll_records = 50
self.args.kafka_max_poll_interval_ms = 300000
self.args.kafka_session_timeout_ms = 50000
self.args.kafka_heartbeat_interval_ms = 10000
self.args.kafka_timeout = 100000

# Kafka consumer creation: SASL or noauth
if self.username != "" and self.password != "":
logging.info(
f"Creating SASL password-protected Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.username,
"sasl_plain_password": self.password,
}
consumer = KafkaConsumer(**common_params, **sasl_params)
else:
logging.info(
f"Creating passwordless Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms"
)
consumer = KafkaConsumer(**common_params)
return consumer
return kafka_init.get_consumer(self.args)

def store_offset_records(self):
consumer = self._getconsumer()
partition_set = consumer.partitions_for_topic(self.topic)
partition_set = consumer.partitions_for_topic(self.args.kafka_topic)
counter = 0
while counter < 5:
counter += 1
partition_set = consumer.partitions_for_topic(self.topic)
partition_set = consumer.partitions_for_topic(self.args.kafka_topic)
if partition_set:
break
else:
time.sleep(10)

partitions = []
for partition_id in partition_set:
partitions.append(TopicPartition(self.topic, partition_id))
partitions.append(TopicPartition(self.args.kafka_topic, partition_id))

curr_offsets = {}
for partition in partitions:
Expand All @@ -83,7 +54,7 @@ def store_offset_records(self):
record = {
"curr_offset": value,
"end_offset": end_offsets[
TopicPartition(topic=self.topic, partition=partition_id)
TopicPartition(topic=self.args.kafka_topic, partition=partition_id)
],
}
self.offset_records[partition_id] = record
Expand Down
51 changes: 9 additions & 42 deletions opl/get_kafka_times.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@
import logging
import os

from kafka import KafkaConsumer

import opl.args
import opl.data
import opl.db
import opl.skelet
from opl.kafka_init import kafka_init

import psycopg2
import psycopg2.extras
Expand All @@ -28,14 +27,11 @@ def __init__(self, args, status_data, custom_methods):
"password": args.storage_db_pass,
}
self.connection = psycopg2.connect(**storage_db_conf)

self.status_data = status_data
self.kafka_host = f"{args.kafka_host}:{args.kafka_port}"
self.kafka_group = args.kafka_group
self.kafka_topic = args.kafka_topic
self.kafka_timeout = args.kafka_timeout
self.kafka_max_poll_records = 100
self.kafka_username = args.kafka_username
self.kafka_password = args.kafka_password
self.args = args
self.args.kafka_max_poll_records = 100

self.queries_definition = yaml.load(
args.tables_definition, Loader=yaml.SafeLoader
)["queries"]
Expand Down Expand Up @@ -79,38 +75,9 @@ def kafka_ts2dt(self, timestamp):
)

def create_consumer(self):
# Store Kafka config to status data
self.status_data.set("parameters.kafka.bootstrap", self.kafka_host)
self.status_data.set("parameters.kafka.group", self.kafka_group)
self.status_data.set("parameters.kafka.topic", self.kafka_topic)
self.status_data.set("parameters.kafka.timeout", self.kafka_timeout)

common_params = {
"bootstrap_servers": self.kafka_host,
"auto_offset_reset": "earliest",
"enable_auto_commit": True,
"group_id": self.kafka_group,
"max_poll_records": self.kafka_max_poll_records,
"session_timeout_ms": 50000,
"heartbeat_interval_ms": 10000,
"consumer_timeout_ms": self.kafka_timeout,
}

if self.kafka_username != "" and self.kafka_password != "":
logging.info(
f"Creating consumer with sasl username&pasword to {self.kafka_host}"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": self.kafka_username,
"sasl_plain_password": self.kafka_password,
}
consumer = KafkaConsumer(self.kafka_topic, **common_params, **sasl_params)
else:
logging.info(f"Creating passwordless consumer to {self.kafka_host}")
consumer = KafkaConsumer(self.kafka_topic, **common_params)
return consumer
self.args.kafka_auto_offset_reset = "earliest"
self.args.kafka_enable_auto_commit = True
return kafka_init.get_consumer(self.args, self.status_data)

def store_now(self):
"""
Expand Down Expand Up @@ -168,7 +135,7 @@ def process_messages(self):
while True:
msg_pack = consumer.poll(
timeout_ms=5000,
max_records=self.kafka_max_poll_records,
max_records=self.args.kafka_max_poll_records,
update_offsets=True,
)
for topic, messages in msg_pack.items():
Expand Down
37 changes: 7 additions & 30 deletions opl/hbi_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
import threading
import time

import kafka

import opl.args
import opl.db
import opl.generators.inventory_ingress
import opl.skelet
from opl.kafka_init import kafka_init

import psycopg2

Expand Down Expand Up @@ -183,35 +182,13 @@ def gen_send_verify(args, status_data):
inventory
) # fetch existing records count

kafka_host = f"{args.kafka_host}:{args.kafka_port}"
logging.info(f"Creating producer to {kafka_host}")
if args.dry_run:
producer = None
else:
if args.kafka_username != "" and args.kafka_password != "":
logging.info(
f"Creating SASL password-protected producer to {args.kafka_host}"
)
producer = kafka.KafkaProducer(
bootstrap_servers=kafka_host,
# api_version=(0, 10),
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username=args.kafka_username,
sasl_plain_password=args.kafka_password,
request_timeout_ms=args.kafka_request_timeout_ms,
retries=args.kafka_retries,
)
else:
logging.info(f"Creating passwordless producer to {args.kafka_host}")
producer = kafka.KafkaProducer(
bootstrap_servers=kafka_host,
api_version=(0, 10),
request_timeout_ms=args.kafka_request_timeout_ms,
retries=args.kafka_retries,
)
logging.info(f"Creating producer to {args.kafka_host}")

# With MSK, a few % of connections usually drop with BrokerNotAvailable error so we need to retry here.
# This oneliner below overrides args.py's default of 0 retries to 3.
args.kafka_retries = 3 if args.kafka_retries == 0 else args.kafka_retries

status_data.set("parameters.kafka.bootstrap", kafka_host)
producer = kafka_init.get_producer(args)

logging.info("Creating data structure to store list of accounts and so")
collect_info = {"accounts": {}} # simplified info about hosts
Expand Down
117 changes: 117 additions & 0 deletions opl/kafka_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from kafka import KafkaProducer, KafkaConsumer

import logging

# Common instantiators for KafkaProducer and KafkaConsumer


class kafka_init:
@staticmethod
def kafka_bootstrap(args):
try:
return args.kafka_bootstrap
except AttributeError:
try:
if args.kafka_hosts != "":
return args.kafka_hosts.split(",")
except AttributeError:
pass
return f"{args.kafka_host}:{args.kafka_port}"

# Based on the args, obtain KafkaProducer instance
@staticmethod
def get_producer(args, status_data=None):
bootstrap_servers = kafka_init.kafka_bootstrap(args)

# Sanitize acks setting
if args.kafka_acks != "all":
args.kafka_acks = int(args.kafka_acks)

if hasattr(args, "dry_run") and args.dry_run:
logging.info("NOT creating a producer as this is a dry run")
producer = None
else:
common_params = {
"bootstrap_servers": bootstrap_servers,
"acks": getattr(args, "kafka_acks", None),
"request_timeout_ms": getattr(args, "kafka_request_timeout_ms", 30000),
"max_block_ms": getattr(args, "kafka_max_block_ms", 60000),
"linger_ms": getattr(args, "kafka_linger_ms", 0),
"compression_type": getattr(args, "kafka_compression_type", None),
"batch_size": getattr(args, "kafka_batch_size", 16384),
"buffer_memory": getattr(args, "kafka_buffer_memory", 33554432),
"retries": getattr(args, "kafka_retries", 0),
}

if args.kafka_username != "" and args.kafka_password != "":
logging.info(
f"Creating SASL password-protected producer to {bootstrap_servers}"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": args.kafka_username,
"sasl_plain_password": args.kafka_password,
}
producer = KafkaProducer(**common_params, **sasl_params)
else:
logging.info(f"Creating passwordless producer to {bootstrap_servers}")
producer = KafkaProducer(**common_params)

if status_data is not None:
status_data.set("parameters.kafka.bootstrap", bootstrap_servers)
status_data.set("parameters.kafka.group", args.kafka_group)
status_data.set("parameters.kafka.topic", args.kafka_topic)
status_data.set("parameters.kafka.timeout", args.kafka_timeout)

return producer

# Based on the args, obtain KafkaConsumer instance.
# If args.kafka_topic is supplied, subscribe to the topic.
@staticmethod
def get_consumer(args, status_data=None):
bootstrap_servers = kafka_init.kafka_bootstrap(args)

# Common parameters for both cases
common_params = {
"bootstrap_servers": bootstrap_servers,
"auto_offset_reset": getattr(args, "kafka_auto_offset_reset", "latest"),
"enable_auto_commit": getattr(args, "kafka_enable_auto_commit", False),
"max_poll_records": getattr(args, "kafka_max_poll_records", 50),
"max_poll_interval_ms": getattr(args, "kafka_max_poll_interval_ms", 300000),
"group_id": getattr(args, "kafka_group", None),
"session_timeout_ms": getattr(args, "kafka_session_timeout_ms", 50000),
"heartbeat_interval_ms": getattr(
args, "kafka_heartbeat_interval_ms", 10000
),
"consumer_timeout_ms": getattr(args, "kafka_timeout", 100000),
}

# Kafka consumer creation: SASL or noauth
if args.kafka_username != "" and args.kafka_password != "":
logging.info(
f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms"
)
sasl_params = {
"security_protocol": "SASL_SSL",
"sasl_mechanism": "SCRAM-SHA-512",
"sasl_plain_username": args.kafka_username,
"sasl_plain_password": args.kafka_password,
}
consumer = KafkaConsumer(**common_params, **sasl_params)
else:
logging.info(
f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms"
)
consumer = KafkaConsumer(**common_params)

if status_data is not None:
status_data.set("parameters.kafka.bootstrap", bootstrap_servers)
status_data.set("parameters.kafka.group", args.kafka_group)
status_data.set("parameters.kafka.topic", args.kafka_topic or "")
status_data.set("parameters.kafka.timeout", args.kafka_timeout)

if args.kafka_topic and args.kafka_topic != "":
consumer.subscribe([args.kafka_topic])

return consumer
Loading
Loading