diff --git a/opl/args.py b/opl/args.py index 1fb7539..d63b61a 100644 --- a/opl/args.py +++ b/opl/args.py @@ -90,7 +90,12 @@ def add_kafka_opts(parser): parser.add_argument( "--kafka-host", default=os.getenv("KAFKA_HOST", "localhost"), - help="Kafka host (also use env variable KAFKA_HOST)", + help="Kafka host (also use env variable KAFKA_HOST). Can get overriden by --kafka-hosts arg or KAFKA_HOSTS envvar.", + ) + parser.add_argument( + "--kafka-hosts", + default=os.getenv("KAFKA_HOSTS", ""), + help="Comma-separated list of hosts, including their ports (also use env variable KAFKA_HOSTS). Takes precedence over --kafka-host and --kafka-port or their envvar variants.", ) parser.add_argument( "--kafka-port", @@ -131,7 +136,7 @@ def add_kafka_opts(parser): help="The client is going to wait this much time for the server to respond to a request (also use env variable KAFKA_REQUEST_TIMEOUT_MS)", ) parser.add_argument( - "--kafka-max_block_ms", + "--kafka-max-block-ms", type=int, default=int(os.getenv("KAFKA_MAX_BLOCK_MS", 60000)), help="Max time to block send e.g. because buffer is full (also use env variable KAFKA_MAX_BLOCK_MS)", diff --git a/opl/consumer_lag.py b/opl/consumer_lag.py index 26c0f43..85885b7 100644 --- a/opl/consumer_lag.py +++ b/opl/consumer_lag.py @@ -2,8 +2,8 @@ import logging import time -from kafka import KafkaConsumer from kafka import TopicPartition +from opl.kafka_init import kafka_init class ConsumerLag: @@ -12,57 +12,28 @@ class ConsumerLag: bootstrap_server and kafka group as input. """ - def __init__( - self, topic, bootstrap_servers, group, username="", password="" - ) -> None: - self.topic = topic - self.group = group - self.bootstrap_servers = bootstrap_servers + def __init__(self, args, kafka_topic) -> None: + self.args = args + self.args.kafka_topic = kafka_topic self.logger = logging.getLogger("consumer_lag") self.offset_records = {} - self.username = username - self.password = password def _getconsumer(self): - # Common parameters for both cases - common_params = { - "bootstrap_servers": self.bootstrap_servers, - "auto_offset_reset": "latest", - "enable_auto_commit": False, - "max_poll_records": 50, - "max_poll_interval_ms": 300000, - "group_id": self.group, - "session_timeout_ms": 50000, - "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": 100000, - } + self.args.kafka_max_poll_records = 50 + self.args.kafka_max_poll_interval_ms = 300000 + self.args.kafka_session_timeout_ms = 50000 + self.args.kafka_heartbeat_interval_ms = 10000 + self.args.kafka_timeout = 100000 - # Kafka consumer creation: SASL or noauth - if self.username != "" and self.password != "": - logging.info( - f"Creating SASL password-protected Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": self.username, - "sasl_plain_password": self.password, - } - consumer = KafkaConsumer(**common_params, **sasl_params) - else: - logging.info( - f"Creating passwordless Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms" - ) - consumer = KafkaConsumer(**common_params) - return consumer + return kafka_init.get_consumer(self.args) def store_offset_records(self): consumer = self._getconsumer() - partition_set = consumer.partitions_for_topic(self.topic) + partition_set = consumer.partitions_for_topic(self.args.kafka_topic) counter = 0 while counter < 5: counter += 1 - partition_set = consumer.partitions_for_topic(self.topic) + partition_set = consumer.partitions_for_topic(self.args.kafka_topic) if partition_set: break else: @@ -70,7 +41,7 @@ def store_offset_records(self): partitions = [] for partition_id in partition_set: - partitions.append(TopicPartition(self.topic, partition_id)) + partitions.append(TopicPartition(self.args.kafka_topic, partition_id)) curr_offsets = {} for partition in partitions: @@ -83,7 +54,7 @@ def store_offset_records(self): record = { "curr_offset": value, "end_offset": end_offsets[ - TopicPartition(topic=self.topic, partition=partition_id) + TopicPartition(topic=self.args.kafka_topic, partition=partition_id) ], } self.offset_records[partition_id] = record diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index 3cad994..0ef02b3 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -5,12 +5,11 @@ import logging import os -from kafka import KafkaConsumer - import opl.args import opl.data import opl.db import opl.skelet +from opl.kafka_init import kafka_init import psycopg2 import psycopg2.extras @@ -28,14 +27,11 @@ def __init__(self, args, status_data, custom_methods): "password": args.storage_db_pass, } self.connection = psycopg2.connect(**storage_db_conf) + self.status_data = status_data - self.kafka_host = f"{args.kafka_host}:{args.kafka_port}" - self.kafka_group = args.kafka_group - self.kafka_topic = args.kafka_topic - self.kafka_timeout = args.kafka_timeout - self.kafka_max_poll_records = 100 - self.kafka_username = args.kafka_username - self.kafka_password = args.kafka_password + self.args = args + self.args.kafka_max_poll_records = 100 + self.queries_definition = yaml.load( args.tables_definition, Loader=yaml.SafeLoader )["queries"] @@ -79,38 +75,9 @@ def kafka_ts2dt(self, timestamp): ) def create_consumer(self): - # Store Kafka config to status data - self.status_data.set("parameters.kafka.bootstrap", self.kafka_host) - self.status_data.set("parameters.kafka.group", self.kafka_group) - self.status_data.set("parameters.kafka.topic", self.kafka_topic) - self.status_data.set("parameters.kafka.timeout", self.kafka_timeout) - - common_params = { - "bootstrap_servers": self.kafka_host, - "auto_offset_reset": "earliest", - "enable_auto_commit": True, - "group_id": self.kafka_group, - "max_poll_records": self.kafka_max_poll_records, - "session_timeout_ms": 50000, - "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": self.kafka_timeout, - } - - if self.kafka_username != "" and self.kafka_password != "": - logging.info( - f"Creating consumer with sasl username&pasword to {self.kafka_host}" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": self.kafka_username, - "sasl_plain_password": self.kafka_password, - } - consumer = KafkaConsumer(self.kafka_topic, **common_params, **sasl_params) - else: - logging.info(f"Creating passwordless consumer to {self.kafka_host}") - consumer = KafkaConsumer(self.kafka_topic, **common_params) - return consumer + self.args.kafka_auto_offset_reset = "earliest" + self.args.kafka_enable_auto_commit = True + return kafka_init.get_consumer(self.args, self.status_data) def store_now(self): """ @@ -168,7 +135,7 @@ def process_messages(self): while True: msg_pack = consumer.poll( timeout_ms=5000, - max_records=self.kafka_max_poll_records, + max_records=self.args.kafka_max_poll_records, update_offsets=True, ) for topic, messages in msg_pack.items(): diff --git a/opl/hbi_utils.py b/opl/hbi_utils.py index 7dc5480..a494417 100644 --- a/opl/hbi_utils.py +++ b/opl/hbi_utils.py @@ -4,12 +4,11 @@ import threading import time -import kafka - import opl.args import opl.db import opl.generators.inventory_ingress import opl.skelet +from opl.kafka_init import kafka_init import psycopg2 @@ -183,35 +182,13 @@ def gen_send_verify(args, status_data): inventory ) # fetch existing records count - kafka_host = f"{args.kafka_host}:{args.kafka_port}" - logging.info(f"Creating producer to {kafka_host}") - if args.dry_run: - producer = None - else: - if args.kafka_username != "" and args.kafka_password != "": - logging.info( - f"Creating SASL password-protected producer to {args.kafka_host}" - ) - producer = kafka.KafkaProducer( - bootstrap_servers=kafka_host, - # api_version=(0, 10), - security_protocol="SASL_SSL", - sasl_mechanism="SCRAM-SHA-512", - sasl_plain_username=args.kafka_username, - sasl_plain_password=args.kafka_password, - request_timeout_ms=args.kafka_request_timeout_ms, - retries=args.kafka_retries, - ) - else: - logging.info(f"Creating passwordless producer to {args.kafka_host}") - producer = kafka.KafkaProducer( - bootstrap_servers=kafka_host, - api_version=(0, 10), - request_timeout_ms=args.kafka_request_timeout_ms, - retries=args.kafka_retries, - ) + logging.info(f"Creating producer to {args.kafka_host}") + + # With MSK, a few % of connections usually drop with BrokerNotAvailable error so we need to retry here. + # This oneliner below overrides args.py's default of 0 retries to 3. + args.kafka_retries = 3 if args.kafka_retries == 0 else args.kafka_retries - status_data.set("parameters.kafka.bootstrap", kafka_host) + producer = kafka_init.get_producer(args) logging.info("Creating data structure to store list of accounts and so") collect_info = {"accounts": {}} # simplified info about hosts diff --git a/opl/kafka_init.py b/opl/kafka_init.py new file mode 100644 index 0000000..532e834 --- /dev/null +++ b/opl/kafka_init.py @@ -0,0 +1,117 @@ +from kafka import KafkaProducer, KafkaConsumer + +import logging + +# Common instantiators for KafkaProducer and KafkaConsumer + + +class kafka_init: + @staticmethod + def kafka_bootstrap(args): + try: + return args.kafka_bootstrap + except AttributeError: + try: + if args.kafka_hosts != "": + return args.kafka_hosts.split(",") + except AttributeError: + pass + return f"{args.kafka_host}:{args.kafka_port}" + + # Based on the args, obtain KafkaProducer instance + @staticmethod + def get_producer(args, status_data=None): + bootstrap_servers = kafka_init.kafka_bootstrap(args) + + # Sanitize acks setting + if args.kafka_acks != "all": + args.kafka_acks = int(args.kafka_acks) + + if hasattr(args, "dry_run") and args.dry_run: + logging.info("NOT creating a producer as this is a dry run") + producer = None + else: + common_params = { + "bootstrap_servers": bootstrap_servers, + "acks": getattr(args, "kafka_acks", None), + "request_timeout_ms": getattr(args, "kafka_request_timeout_ms", 30000), + "max_block_ms": getattr(args, "kafka_max_block_ms", 60000), + "linger_ms": getattr(args, "kafka_linger_ms", 0), + "compression_type": getattr(args, "kafka_compression_type", None), + "batch_size": getattr(args, "kafka_batch_size", 16384), + "buffer_memory": getattr(args, "kafka_buffer_memory", 33554432), + "retries": getattr(args, "kafka_retries", 0), + } + + if args.kafka_username != "" and args.kafka_password != "": + logging.info( + f"Creating SASL password-protected producer to {bootstrap_servers}" + ) + sasl_params = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": args.kafka_username, + "sasl_plain_password": args.kafka_password, + } + producer = KafkaProducer(**common_params, **sasl_params) + else: + logging.info(f"Creating passwordless producer to {bootstrap_servers}") + producer = KafkaProducer(**common_params) + + if status_data is not None: + status_data.set("parameters.kafka.bootstrap", bootstrap_servers) + status_data.set("parameters.kafka.group", args.kafka_group) + status_data.set("parameters.kafka.topic", args.kafka_topic) + status_data.set("parameters.kafka.timeout", args.kafka_timeout) + + return producer + + # Based on the args, obtain KafkaConsumer instance. + # If args.kafka_topic is supplied, subscribe to the topic. + @staticmethod + def get_consumer(args, status_data=None): + bootstrap_servers = kafka_init.kafka_bootstrap(args) + + # Common parameters for both cases + common_params = { + "bootstrap_servers": bootstrap_servers, + "auto_offset_reset": getattr(args, "kafka_auto_offset_reset", "latest"), + "enable_auto_commit": getattr(args, "kafka_enable_auto_commit", False), + "max_poll_records": getattr(args, "kafka_max_poll_records", 50), + "max_poll_interval_ms": getattr(args, "kafka_max_poll_interval_ms", 300000), + "group_id": getattr(args, "kafka_group", None), + "session_timeout_ms": getattr(args, "kafka_session_timeout_ms", 50000), + "heartbeat_interval_ms": getattr( + args, "kafka_heartbeat_interval_ms", 10000 + ), + "consumer_timeout_ms": getattr(args, "kafka_timeout", 100000), + } + + # Kafka consumer creation: SASL or noauth + if args.kafka_username != "" and args.kafka_password != "": + logging.info( + f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" + ) + sasl_params = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": args.kafka_username, + "sasl_plain_password": args.kafka_password, + } + consumer = KafkaConsumer(**common_params, **sasl_params) + else: + logging.info( + f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" + ) + consumer = KafkaConsumer(**common_params) + + if status_data is not None: + status_data.set("parameters.kafka.bootstrap", bootstrap_servers) + status_data.set("parameters.kafka.group", args.kafka_group) + status_data.set("parameters.kafka.topic", args.kafka_topic or "") + status_data.set("parameters.kafka.timeout", args.kafka_timeout) + + if args.kafka_topic and args.kafka_topic != "": + consumer.subscribe([args.kafka_topic]) + + return consumer diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index 3bc5774..d5fb05e 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -7,12 +7,11 @@ import threading import time -from kafka import KafkaProducer - import opl.args import opl.data import opl.db import opl.skelet +from opl.kafka_init import kafka_init import psycopg2 import psycopg2.extras @@ -135,7 +134,6 @@ def __init__(self, args, config, produce_here, save_here): self.config = config self.produce_here = produce_here self.save_here = save_here - self.kafka_topic = args.kafka_topic self.show_processed_messages = args.show_processed_messages self.rate = args.rate @@ -197,7 +195,7 @@ def wait_for_next_second(second=int(time.perf_counter())): if self.show_processed_messages: print(f"Producing {json.dumps(send_params, sort_keys=True)}") - future = self.produce_here.send(self.kafka_topic, **send_params) + future = self.produce_here.send(self.args.kafka_topic, **send_params) future.add_callback(handle_send_success, message_id=message_id) future.add_errback(handle_send_error, message_id=message_id) @@ -276,36 +274,7 @@ def produce_thread(args, config, produce_here, save_here): args_copy["tables_definition"] = args_copy["tables_definition"].name status_data.set("parameters.produce_messages", args_copy) - # Sanitize acks setting - if args.kafka_acks != "all": - args.kafka_acks = int(args.kafka_acks) - - kafka_host = f"{args.kafka_host}:{args.kafka_port}" - # Common parameters for both cases - common_params = { - "bootstrap_servers": kafka_host, - "acks": args.kafka_acks, - "retries": args.kafka_retries, - "batch_size": args.kafka_batch_size, - "buffer_memory": args.kafka_buffer_memory, - "linger_ms": args.kafka_linger_ms, - "max_block_ms": args.kafka_max_block_ms, - "request_timeout_ms": args.kafka_request_timeout_ms, - "compression_type": args.kafka_compression_type, - } - - if args.kafka_username != "" and args.kafka_password != "": - logging.info(f"Creating SASL password-protected producer to {kafka_host}") - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": args.kafka_username, - "sasl_plain_password": args.kafka_password, - } - produce_here = KafkaProducer(**common_params, **sasl_params) - else: - logging.info(f"Creating passwordless producer to {kafka_host}") - produce_here = KafkaProducer(**common_params) + produce_here = kafka_init.get_producer(args) logging.info(f"Loading queries definition from {args.tables_definition}") queries_definition = yaml.load(args.tables_definition, Loader=yaml.SafeLoader)[ diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 057d49a..cedde49 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -5,45 +5,11 @@ import os import time -from kafka import KafkaConsumer - +from .kafka_init import kafka_init from . import args from . import skelet -def create_consumer(args): - # Common parameters for both cases - kafka_host = f"{args.kafka_host}:{args.kafka_port}" - common_params = { - "bootstrap_servers": kafka_host, - "auto_offset_reset": "latest", - "enable_auto_commit": True, - "group_id": args.kafka_group, - "session_timeout_ms": 50000, - "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": args.kafka_timeout, - } - - # Kafka consumer creation: SASL or noauth - if args.kafka_username != "" and args.kafka_password != "": - logging.info( - f"Creating SASL password-protected Kafka consumer for {kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": args.kafka_username, - "sasl_plain_password": args.kafka_password, - } - consumer = KafkaConsumer(args.kafka_topic, **common_params, **sasl_params) - else: - logging.info( - f"Creating passwordless producer for for {kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" - ) - consumer = KafkaConsumer(args.kafka_topic, **common_params) - return consumer - - def doit_seek_to_end(args): """ Create consumer and seek to end @@ -54,7 +20,8 @@ def doit_seek_to_end(args): on multiple pods. """ - consumer = create_consumer(args) + args.kafka_enable_auto_commit = True + consumer = kafka_init.get_consumer(args) # Seek to end for attempt in range(10):