From 6ef513b9bfaa6c92aecbecc6ec7ad616d7be615c Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 11:40:12 +0100 Subject: [PATCH 01/31] fix(skip_to_end): Supply server port as well --- opl/skip_to_end.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 2c6b7cf..0359f52 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -13,8 +13,9 @@ def create_consumer(args): # Common parameters for both cases + kafka_host = f"{args.kafka_host}:{args.kafka_port}" common_params = { - "bootstrap_servers": args.kafka_host, + "bootstrap_servers": kafka_host, "auto_offset_reset": "latest", "enable_auto_commit": True, "group_id": args.kafka_group, @@ -26,7 +27,7 @@ def create_consumer(args): # Kafka consumer creation: SASL or noauth if args.kafka_username != "" and args.kafka_password != "": logging.info( - f"Creating SASL password-protected Kafka consumer for {args.kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" + f"Creating SASL password-protected Kafka consumer for {kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" ) sasl_params = { "security_protocol": "SASL_SSL", @@ -34,12 +35,12 @@ def create_consumer(args): "sasl_plain_username": args.kafka_username, "sasl_plain_password": args.kafka_password, } - consumer = KafkaConsumer([args.kafka_topic], **common_params, **sasl_params) + consumer = KafkaConsumer(args.kafka_topic, **common_params, **sasl_params) else: logging.info( - f"Creating passwordless producer for for {args.kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" + f"Creating passwordless producer for for {kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" ) - consumer = KafkaConsumer([args.kafka_topic], **common_params) + consumer = KafkaConsumer(args.kafka_topic, **common_params) return consumer @@ -92,6 +93,10 @@ def main(): help="Topic for which to skip to end (also use env variable KAFKA_TOPIC)", ) args.add_kafka_opts(parser) + if args.debug: + logging.basicConfig(format=fmt, level=logging.DEBUG) + + logging.basicConfig(level=logging.INFO) with skelet.test_setup(parser) as (params, status_data): doit(params, status_data) From 971f96a8864d6c7d563dd3bf9fef01d1b07be3c5 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 11:47:16 +0100 Subject: [PATCH 02/31] refactor(post_kafka): Unify Kafka_host assignment to before common_params --- opl/post_kafka_times.py | 27 ++++++++++++--------------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index 69ddff7..5c62598 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -280,23 +280,22 @@ def produce_thread(args, config, produce_here, save_here): if args.kafka_acks != "all": args.kafka_acks = int(args.kafka_acks) + kafka_host = f"{args.kafka_host}:{args.kafka_port}" # Common parameters for both cases common_params = { - "bootstrap_servers": [f"{args.kafka_host}:{args.kafka_port}"], - "acks": args.kafka_acks, - "retries": args.kafka_retries, - "batch_size": args.kafka_batch_size, - "buffer_memory": args.kafka_buffer_memory, - "linger_ms": args.kafka_linger_ms, - "max_block_ms": args.kafka_max_block_ms, - "request_timeout_ms": args.kafka_request_timeout_ms, - "compression_type": args.kafka_compression_type, + "bootstrap_servers": kafka_host, + "acks": args.acks, + "retries": args.retries, + "batch_size": args.batch_size, + "buffer_memory": args.buffer_memory, + "linger_ms": args.linger_ms, + "max_block_ms": args.max_block_ms, + "request_timeout_ms": args.request_timeout_ms, + "compression_type": args.compression_type, } if args.kafka_username != "" and args.kafka_password != "": - logging.info( - f"Creating SASL password-protected producer to {args.kafka_host}" - ) + logging.info(f"Creating SASL password-protected producer to {kafka_host}") sasl_params = { "security_protocol": "SASL_SSL", "sasl_mechanism": "SCRAM-SHA-512", @@ -305,9 +304,7 @@ def produce_thread(args, config, produce_here, save_here): } produce_here = KafkaProducer(**common_params, **sasl_params) else: - logging.info( - f"Creating passwordless producer to {args.kafka_host}:{args.kafka_port}" - ) + logging.info(f"Creating passwordless producer to {kafka_host}") produce_here = KafkaProducer(**common_params) logging.info(f"Loading queries definition from {args.tables_definition}") From f3bba4b7111c763bc18c394e1aef952131168985 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 11:50:03 +0100 Subject: [PATCH 03/31] fix(get_kafka): Omit list bracets in non-pointer supplying to Kafka --- opl/get_kafka_times.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index 1ec702a..3cad994 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -106,10 +106,10 @@ def create_consumer(self): "sasl_plain_username": self.kafka_username, "sasl_plain_password": self.kafka_password, } - consumer = KafkaConsumer([self.kafka_topic], **common_params, **sasl_params) + consumer = KafkaConsumer(self.kafka_topic, **common_params, **sasl_params) else: logging.info(f"Creating passwordless consumer to {self.kafka_host}") - consumer = KafkaConsumer([self.kafka_topic], **common_params) + consumer = KafkaConsumer(self.kafka_topic, **common_params) return consumer def store_now(self): From dd13bf72957164578c0898cdbcd65ed950b899e7 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 11:50:32 +0100 Subject: [PATCH 04/31] feat(skelet): Add INFO output option --- opl/skelet.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/opl/skelet.py b/opl/skelet.py index cc6f00c..d9f64b6 100644 --- a/opl/skelet.py +++ b/opl/skelet.py @@ -13,11 +13,14 @@ def test_setup(parser): help='File where we maintain metadata, results, parameters and measurements for this test run (also use env variable STATUS_DATA_FILE, default to "/tmp/status-data.json")', ) parser.add_argument("-d", "--debug", action="store_true", help="Show debug output") + parser.add_argument("-i", "--info", action="store_true", help="Show info output") args = parser.parse_args() fmt = "%(asctime)s %(name)s %(levelname)s %(message)s" if args.debug: logging.basicConfig(format=fmt, level=logging.DEBUG) + elif args.info: + logging.basicConfig(format=fmt, level=logging.INFO) else: logging.basicConfig(format=fmt) From 39f9234301dd59166eb395a01432589c0d111059 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 12:05:52 +0100 Subject: [PATCH 05/31] lint(skip_to_end) --- opl/skip_to_end.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 0359f52..6c520b4 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -95,7 +95,7 @@ def main(): args.add_kafka_opts(parser) if args.debug: logging.basicConfig(format=fmt, level=logging.DEBUG) - + logging.basicConfig(level=logging.INFO) with skelet.test_setup(parser) as (params, status_data): From be32c45fd19622291c53a0f90162ce4a2be32fad Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 14:36:33 +0100 Subject: [PATCH 06/31] fix(post_kafka): Give back kafka_ prefix --- opl/post_kafka_times.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index 5c62598..3bc5774 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -284,14 +284,14 @@ def produce_thread(args, config, produce_here, save_here): # Common parameters for both cases common_params = { "bootstrap_servers": kafka_host, - "acks": args.acks, - "retries": args.retries, - "batch_size": args.batch_size, - "buffer_memory": args.buffer_memory, - "linger_ms": args.linger_ms, - "max_block_ms": args.max_block_ms, - "request_timeout_ms": args.request_timeout_ms, - "compression_type": args.compression_type, + "acks": args.kafka_acks, + "retries": args.kafka_retries, + "batch_size": args.kafka_batch_size, + "buffer_memory": args.kafka_buffer_memory, + "linger_ms": args.kafka_linger_ms, + "max_block_ms": args.kafka_max_block_ms, + "request_timeout_ms": args.kafka_request_timeout_ms, + "compression_type": args.kafka_compression_type, } if args.kafka_username != "" and args.kafka_password != "": From 97080687c129c54d69794f9f9506b604f985ece8 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 14:37:05 +0100 Subject: [PATCH 07/31] feat(skelet setup): Rename --info to --verbose to be unified with others --- opl/skelet.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/opl/skelet.py b/opl/skelet.py index d9f64b6..a1ac348 100644 --- a/opl/skelet.py +++ b/opl/skelet.py @@ -13,13 +13,15 @@ def test_setup(parser): help='File where we maintain metadata, results, parameters and measurements for this test run (also use env variable STATUS_DATA_FILE, default to "/tmp/status-data.json")', ) parser.add_argument("-d", "--debug", action="store_true", help="Show debug output") - parser.add_argument("-i", "--info", action="store_true", help="Show info output") + parser.add_argument( + "-v", "--verbose", action="store_true", help="Be verbose - show info output" + ) args = parser.parse_args() fmt = "%(asctime)s %(name)s %(levelname)s %(message)s" if args.debug: logging.basicConfig(format=fmt, level=logging.DEBUG) - elif args.info: + elif args.verbose: logging.basicConfig(format=fmt, level=logging.INFO) else: logging.basicConfig(format=fmt) From 0a354643c707b6032df025eba8c8735bea2ed52c Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 12 Jan 2024 14:43:54 +0100 Subject: [PATCH 08/31] fix(skip_end): Omit explicit logging level logic, use what's in skelet already --- opl/skip_to_end.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 6c520b4..057d49a 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -93,10 +93,6 @@ def main(): help="Topic for which to skip to end (also use env variable KAFKA_TOPIC)", ) args.add_kafka_opts(parser) - if args.debug: - logging.basicConfig(format=fmt, level=logging.DEBUG) - - logging.basicConfig(level=logging.INFO) with skelet.test_setup(parser) as (params, status_data): doit(params, status_data) From 3e2ce80ca69017e9ed2bad1d5c029d979fe6fe1b Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Tue, 13 Feb 2024 19:55:59 +0100 Subject: [PATCH 09/31] feat(HBI): Use retries and timeout from args in KafkaProducer --- opl/hbi_utils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/opl/hbi_utils.py b/opl/hbi_utils.py index cbe9bab..7dc5480 100644 --- a/opl/hbi_utils.py +++ b/opl/hbi_utils.py @@ -199,12 +199,16 @@ def gen_send_verify(args, status_data): sasl_mechanism="SCRAM-SHA-512", sasl_plain_username=args.kafka_username, sasl_plain_password=args.kafka_password, + request_timeout_ms=args.kafka_request_timeout_ms, + retries=args.kafka_retries, ) else: logging.info(f"Creating passwordless producer to {args.kafka_host}") producer = kafka.KafkaProducer( bootstrap_servers=kafka_host, api_version=(0, 10), + request_timeout_ms=args.kafka_request_timeout_ms, + retries=args.kafka_retries, ) status_data.set("parameters.kafka.bootstrap", kafka_host) From c19beda96a7f96af004a3af3ea6edff0ec335d89 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Wed, 14 Feb 2024 20:22:19 +0100 Subject: [PATCH 10/31] refactor(Kafka): Extract and unify KafkaConsumer & KafkaProducer constructors, feat: use multiple brokers to disperse load with arg `--kafka-hosts` --- opl/args.py | 7 ++- opl/consumer_lag.py | 58 ++++++--------------- opl/get_kafka_times.py | 48 +++--------------- opl/hbi_utils.py | 35 +++---------- opl/kafka_init.py | 109 ++++++++++++++++++++++++++++++++++++++++ opl/post_kafka_times.py | 34 +------------ opl/skip_to_end.py | 41 ++------------- 7 files changed, 150 insertions(+), 182 deletions(-) create mode 100644 opl/kafka_init.py diff --git a/opl/args.py b/opl/args.py index 1fb7539..c74177d 100644 --- a/opl/args.py +++ b/opl/args.py @@ -92,6 +92,11 @@ def add_kafka_opts(parser): default=os.getenv("KAFKA_HOST", "localhost"), help="Kafka host (also use env variable KAFKA_HOST)", ) + parser.add_argument( + "--kafka-hosts", + default=os.getenv("KAFKA_HOSTS", ""), + help="Comma-separated list of hosts, including their ports (also use env variable KAFKA_HOSTS)", + ) parser.add_argument( "--kafka-port", type=int, @@ -131,7 +136,7 @@ def add_kafka_opts(parser): help="The client is going to wait this much time for the server to respond to a request (also use env variable KAFKA_REQUEST_TIMEOUT_MS)", ) parser.add_argument( - "--kafka-max_block_ms", + "--kafka-max-block-ms", type=int, default=int(os.getenv("KAFKA_MAX_BLOCK_MS", 60000)), help="Max time to block send e.g. because buffer is full (also use env variable KAFKA_MAX_BLOCK_MS)", diff --git a/opl/consumer_lag.py b/opl/consumer_lag.py index 26c0f43..57ef470 100644 --- a/opl/consumer_lag.py +++ b/opl/consumer_lag.py @@ -2,9 +2,10 @@ import logging import time -from kafka import KafkaConsumer from kafka import TopicPartition +import opl.kafka_init as kafka_init + class ConsumerLag: """ @@ -12,57 +13,28 @@ class ConsumerLag: bootstrap_server and kafka group as input. """ - def __init__( - self, topic, bootstrap_servers, group, username="", password="" - ) -> None: - self.topic = topic - self.group = group - self.bootstrap_servers = bootstrap_servers + def __init__(args, kafka_topic) -> None: + self.args = args + self.args.kafka_topic = kafka_topic self.logger = logging.getLogger("consumer_lag") self.offset_records = {} - self.username = username - self.password = password def _getconsumer(self): - # Common parameters for both cases - common_params = { - "bootstrap_servers": self.bootstrap_servers, - "auto_offset_reset": "latest", - "enable_auto_commit": False, - "max_poll_records": 50, - "max_poll_interval_ms": 300000, - "group_id": self.group, - "session_timeout_ms": 50000, - "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": 100000, - } + self.args.max_poll_records = 50 + self.args.max_poll_interval_ms = 300000 + self.args.session_timeout_ms = 50000 + self.args.heartbeat_interval_ms = 10000 + self.args.kafka_timeout = 100000 - # Kafka consumer creation: SASL or noauth - if self.username != "" and self.password != "": - logging.info( - f"Creating SASL password-protected Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": self.username, - "sasl_plain_password": self.password, - } - consumer = KafkaConsumer(**common_params, **sasl_params) - else: - logging.info( - f"Creating passwordless Kafka consumer for {self.bootstrap_servers} in group {self.group} with timeout {common_params['session_timeout_ms']} ms" - ) - consumer = KafkaConsumer(**common_params) - return consumer + return kafka_init.get_consumer(self.args) def store_offset_records(self): consumer = self._getconsumer() - partition_set = consumer.partitions_for_topic(self.topic) + partition_set = consumer.partitions_for_topic(self.args.kafka_topic) counter = 0 while counter < 5: counter += 1 - partition_set = consumer.partitions_for_topic(self.topic) + partition_set = consumer.partitions_for_topic(self.args.kafka_topic) if partition_set: break else: @@ -70,7 +42,7 @@ def store_offset_records(self): partitions = [] for partition_id in partition_set: - partitions.append(TopicPartition(self.topic, partition_id)) + partitions.append(TopicPartition(self.args.kafka_topic, partition_id)) curr_offsets = {} for partition in partitions: @@ -83,7 +55,7 @@ def store_offset_records(self): record = { "curr_offset": value, "end_offset": end_offsets[ - TopicPartition(topic=self.topic, partition=partition_id) + TopicPartition(topic=self.args.kafka_topic, partition=partition_id) ], } self.offset_records[partition_id] = record diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index 3cad994..e8dcc64 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -5,12 +5,11 @@ import logging import os -from kafka import KafkaConsumer - import opl.args import opl.data import opl.db import opl.skelet +import opl.kafka_init as kafka_init import psycopg2 import psycopg2.extras @@ -28,14 +27,10 @@ def __init__(self, args, status_data, custom_methods): "password": args.storage_db_pass, } self.connection = psycopg2.connect(**storage_db_conf) + self.status_data = status_data - self.kafka_host = f"{args.kafka_host}:{args.kafka_port}" - self.kafka_group = args.kafka_group - self.kafka_topic = args.kafka_topic - self.kafka_timeout = args.kafka_timeout - self.kafka_max_poll_records = 100 - self.kafka_username = args.kafka_username - self.kafka_password = args.kafka_password + self.args = args + self.queries_definition = yaml.load( args.tables_definition, Loader=yaml.SafeLoader )["queries"] @@ -79,38 +74,9 @@ def kafka_ts2dt(self, timestamp): ) def create_consumer(self): - # Store Kafka config to status data - self.status_data.set("parameters.kafka.bootstrap", self.kafka_host) - self.status_data.set("parameters.kafka.group", self.kafka_group) - self.status_data.set("parameters.kafka.topic", self.kafka_topic) - self.status_data.set("parameters.kafka.timeout", self.kafka_timeout) - - common_params = { - "bootstrap_servers": self.kafka_host, - "auto_offset_reset": "earliest", - "enable_auto_commit": True, - "group_id": self.kafka_group, - "max_poll_records": self.kafka_max_poll_records, - "session_timeout_ms": 50000, - "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": self.kafka_timeout, - } - - if self.kafka_username != "" and self.kafka_password != "": - logging.info( - f"Creating consumer with sasl username&pasword to {self.kafka_host}" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": self.kafka_username, - "sasl_plain_password": self.kafka_password, - } - consumer = KafkaConsumer(self.kafka_topic, **common_params, **sasl_params) - else: - logging.info(f"Creating passwordless consumer to {self.kafka_host}") - consumer = KafkaConsumer(self.kafka_topic, **common_params) - return consumer + self.args.auto_offset_reset = "earliest" + self.args.enable_auto_commit = True + return kafka_init.get_consumer(self.args, self.status_data) def store_now(self): """ diff --git a/opl/hbi_utils.py b/opl/hbi_utils.py index 7dc5480..26b9eb5 100644 --- a/opl/hbi_utils.py +++ b/opl/hbi_utils.py @@ -10,6 +10,7 @@ import opl.db import opl.generators.inventory_ingress import opl.skelet +import opl.kafka_init as kafka_init import psycopg2 @@ -183,35 +184,13 @@ def gen_send_verify(args, status_data): inventory ) # fetch existing records count - kafka_host = f"{args.kafka_host}:{args.kafka_port}" - logging.info(f"Creating producer to {kafka_host}") - if args.dry_run: - producer = None - else: - if args.kafka_username != "" and args.kafka_password != "": - logging.info( - f"Creating SASL password-protected producer to {args.kafka_host}" - ) - producer = kafka.KafkaProducer( - bootstrap_servers=kafka_host, - # api_version=(0, 10), - security_protocol="SASL_SSL", - sasl_mechanism="SCRAM-SHA-512", - sasl_plain_username=args.kafka_username, - sasl_plain_password=args.kafka_password, - request_timeout_ms=args.kafka_request_timeout_ms, - retries=args.kafka_retries, - ) - else: - logging.info(f"Creating passwordless producer to {args.kafka_host}") - producer = kafka.KafkaProducer( - bootstrap_servers=kafka_host, - api_version=(0, 10), - request_timeout_ms=args.kafka_request_timeout_ms, - retries=args.kafka_retries, - ) + logging.info(f"Creating producer to {args.kafka_host}") + + # With MSK, a few % of connections usually drop with BrokerNotAvailable error so we need to retry here. + # This oneliner below overrides args.py's default of 0 retries to 3. + args.kafka_retries = 3 if args.kafka_retries == 0 else args.kafka_retries - status_data.set("parameters.kafka.bootstrap", kafka_host) + producer = kafka_init.get_producer(args) logging.info("Creating data structure to store list of accounts and so") collect_info = {"accounts": {}} # simplified info about hosts diff --git a/opl/kafka_init.py b/opl/kafka_init.py new file mode 100644 index 0000000..0d6b865 --- /dev/null +++ b/opl/kafka_init.py @@ -0,0 +1,109 @@ +import kafka + +# from . import status_data +import logging + +### Common instantiators for KafkaProducer and KafkaConsumer + + +def kafka_bootstrap(args): + if args.kafka_bootstrap: + return args.kafka_bootstrap + if args.kafka_hosts != None and args.kafka_hosts != "": + return args.kafka_hosts.split(",") + else: + return f"{args.kafka_host}:{args.kafka_port}" + + +# From args, obtain +def get_producer(args, status_data=None): + bootstrap_servers = kafka_bootstrap(args) + + # Sanitize acks setting + if args.kafka_acks != "all": + args.kafka_acks = int(args.kafka_acks) + + if args.dry_run: + logging.info(f"NOT creating a producer as this is a dry run") + producer = None + else: + common_params = { + "bootstrap_servers": bootstrap_servers, + "acks": args.kafka_acks, + "request_timeout_ms": args.kafka_request_timeout_ms or 30000, + "max_block_ms": args.kafka_max_block_ms or 60000, + "linger_ms": args.kafka_linger_ms or 0, + "compression_type": args.kafka_compression_type or None, + "batch_size": args.kafka_batch_size or 16384, + "buffer_memory": args.kafka_buffer_memory or 33554432, + "retries": args.kafka_retries or 0, + } + + if args.kafka_username != "" and args.kafka_password != "": + logging.info( + f"Creating SASL password-protected producer to {bootstrap_servers}" + ) + sasl_params = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": args.kafka_username, + "sasl_plain_password": args.kafka_password, + } + producer = KafkaProducer(**common_params, **sasl_params) + else: + logging.info(f"Creating passwordless producer to {bootstrap_servers}") + producer = KafkaProducer(**common_params) + + if status_data != None: + status_data.set("parameters.kafka.bootstrap", bootstrap_servers) + status_data.set("parameters.kafka.group", args.kafka_group) + status_data.set("parameters.kafka.topic", args.kafka_topic) + status_data.set("parameters.kafka.timeout", args.kafka_timeout) + + return producer + + +def get_consumer(args, status_data=None): + bootstrap_servers = kafka_bootstrap(args) + + # Common parameters for both cases + common_params = { + "bootstrap_servers": args.bootstrap_servers, + "auto_offset_reset": args.auto_offset_reset or "latest", + "enable_auto_commit": args.enable_auto_commit or False, + "max_poll_records": args.max_poll_records or 50, + "max_poll_interval_ms": args.max_poll_interval_ms or 300000, + "group_id": args.kafka_group, + "session_timeout_ms": args.session_timeout_ms or 50000, + "heartbeat_interval_ms": args.heartbeat_interval_ms or 10000, + "consumer_timeout_ms": args.kafka_timeout or 100000, + } + + # Kafka consumer creation: SASL or noauth + if args.username != "" and args.password != "": + logging.info( + f"Creating SASL password-protected Kafka consumer for {args.bootstrap_servers} in group {args.kafka_group} with timeout {args.session_timeout_ms or 50000} ms" + ) + sasl_params = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": args.username, + "sasl_plain_password": args.password, + } + consumer = KafkaConsumer(**common_params, **sasl_params) + else: + logging.info( + f"Creating passwordless Kafka consumer for {args.bootstrap_servers} in group {args.kafka_group} with timeout {common_params['session_timeout_ms']} ms" + ) + consumer = KafkaConsumer(**common_params) + + if status_data != None: + status_data.set("parameters.kafka.bootstrap", bootstrap_servers) + status_data.set("parameters.kafka.group", args.kafka_group) + status_data.set("parameters.kafka.topic", args.kafka_topic or "") + status_data.set("parameters.kafka.timeout", args.kafka_timeout) + + if args.kafka_topic and args.kafka_topic != "": + consumer.subscribe([args.kafka_topic]) + + return consumer diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index 3bc5774..e20d8f1 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -7,12 +7,11 @@ import threading import time -from kafka import KafkaProducer - import opl.args import opl.data import opl.db import opl.skelet +import opl.kafka_init as kafka_init import psycopg2 import psycopg2.extras @@ -276,36 +275,7 @@ def produce_thread(args, config, produce_here, save_here): args_copy["tables_definition"] = args_copy["tables_definition"].name status_data.set("parameters.produce_messages", args_copy) - # Sanitize acks setting - if args.kafka_acks != "all": - args.kafka_acks = int(args.kafka_acks) - - kafka_host = f"{args.kafka_host}:{args.kafka_port}" - # Common parameters for both cases - common_params = { - "bootstrap_servers": kafka_host, - "acks": args.kafka_acks, - "retries": args.kafka_retries, - "batch_size": args.kafka_batch_size, - "buffer_memory": args.kafka_buffer_memory, - "linger_ms": args.kafka_linger_ms, - "max_block_ms": args.kafka_max_block_ms, - "request_timeout_ms": args.kafka_request_timeout_ms, - "compression_type": args.kafka_compression_type, - } - - if args.kafka_username != "" and args.kafka_password != "": - logging.info(f"Creating SASL password-protected producer to {kafka_host}") - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": args.kafka_username, - "sasl_plain_password": args.kafka_password, - } - produce_here = KafkaProducer(**common_params, **sasl_params) - else: - logging.info(f"Creating passwordless producer to {kafka_host}") - produce_here = KafkaProducer(**common_params) + produce_here = kafka_init.get_producer(args) logging.info(f"Loading queries definition from {args.tables_definition}") queries_definition = yaml.load(args.tables_definition, Loader=yaml.SafeLoader)[ diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 057d49a..ebeb96e 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -5,45 +5,11 @@ import os import time -from kafka import KafkaConsumer - +from . import kafka_init from . import args from . import skelet -def create_consumer(args): - # Common parameters for both cases - kafka_host = f"{args.kafka_host}:{args.kafka_port}" - common_params = { - "bootstrap_servers": kafka_host, - "auto_offset_reset": "latest", - "enable_auto_commit": True, - "group_id": args.kafka_group, - "session_timeout_ms": 50000, - "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": args.kafka_timeout, - } - - # Kafka consumer creation: SASL or noauth - if args.kafka_username != "" and args.kafka_password != "": - logging.info( - f"Creating SASL password-protected Kafka consumer for {kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": args.kafka_username, - "sasl_plain_password": args.kafka_password, - } - consumer = KafkaConsumer(args.kafka_topic, **common_params, **sasl_params) - else: - logging.info( - f"Creating passwordless producer for for {kafka_host} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" - ) - consumer = KafkaConsumer(args.kafka_topic, **common_params) - return consumer - - def doit_seek_to_end(args): """ Create consumer and seek to end @@ -54,7 +20,8 @@ def doit_seek_to_end(args): on multiple pods. """ - consumer = create_consumer(args) + args.enable_auto_commit = True + consumer = kafka_init.get_consumer(args) # Seek to end for attempt in range(10): @@ -75,7 +42,7 @@ def doit_seek_to_end(args): def doit(args, status_data): - doit_seek_to_end(args) + doit_seek_to_end(args, status_data) status_data.set("parameters.kafka.seek_topic", args.kafka_topic) status_data.set("parameters.kafka.seek_timeout", args.kafka_timeout) From 54208935d91ae17729f4af806fe214a4462e802e Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Wed, 14 Feb 2024 20:49:17 +0100 Subject: [PATCH 11/31] refactor(postKafkaTimes): Use kafka_topic from args, don't duplicate it --- opl/post_kafka_times.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index e20d8f1..7360986 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -134,7 +134,6 @@ def __init__(self, args, config, produce_here, save_here): self.config = config self.produce_here = produce_here self.save_here = save_here - self.kafka_topic = args.kafka_topic self.show_processed_messages = args.show_processed_messages self.rate = args.rate @@ -196,7 +195,7 @@ def wait_for_next_second(second=int(time.perf_counter())): if self.show_processed_messages: print(f"Producing {json.dumps(send_params, sort_keys=True)}") - future = self.produce_here.send(self.kafka_topic, **send_params) + future = self.produce_here.send(self.args.kafka_topic, **send_params) future.add_callback(handle_send_success, message_id=message_id) future.add_errback(handle_send_error, message_id=message_id) From 13402fd66a70b7cb62f5107939d9540ae8086ee3 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Wed, 14 Feb 2024 20:53:58 +0100 Subject: [PATCH 12/31] docs(kafka_init): Add basic function descriptions --- opl/kafka_init.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 0d6b865..9a647a4 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -15,7 +15,7 @@ def kafka_bootstrap(args): return f"{args.kafka_host}:{args.kafka_port}" -# From args, obtain +# Based on the args, obtain KafkaProducer instance def get_producer(args, status_data=None): bootstrap_servers = kafka_bootstrap(args) @@ -63,6 +63,8 @@ def get_producer(args, status_data=None): return producer +# Based on the args, obtain KafkaConsumer instance. +# If args.kafka_topic is supplied, subscribe to the topic. def get_consumer(args, status_data=None): bootstrap_servers = kafka_bootstrap(args) From b9d396b53f46e1407e1b6f68d1368402e14823b6 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Thu, 15 Feb 2024 11:28:09 +0100 Subject: [PATCH 13/31] fix(skiptoend): Remove param status_data in call as it isn't implemented --- opl/skip_to_end.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index ebeb96e..8de50dd 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -42,7 +42,7 @@ def doit_seek_to_end(args): def doit(args, status_data): - doit_seek_to_end(args, status_data) + doit_seek_to_end(args) status_data.set("parameters.kafka.seek_topic", args.kafka_topic) status_data.set("parameters.kafka.seek_timeout", args.kafka_timeout) From 52e80efa48a2c37f83f3d40e5f4970f58f7ed054 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Thu, 15 Feb 2024 12:32:39 +0100 Subject: [PATCH 14/31] feat(opl args): Improve kafka address arguments to clarify priority of kafka-hosts vs. kafka-host --- opl/args.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opl/args.py b/opl/args.py index c74177d..d63b61a 100644 --- a/opl/args.py +++ b/opl/args.py @@ -90,12 +90,12 @@ def add_kafka_opts(parser): parser.add_argument( "--kafka-host", default=os.getenv("KAFKA_HOST", "localhost"), - help="Kafka host (also use env variable KAFKA_HOST)", + help="Kafka host (also use env variable KAFKA_HOST). Can get overriden by --kafka-hosts arg or KAFKA_HOSTS envvar.", ) parser.add_argument( "--kafka-hosts", default=os.getenv("KAFKA_HOSTS", ""), - help="Comma-separated list of hosts, including their ports (also use env variable KAFKA_HOSTS)", + help="Comma-separated list of hosts, including their ports (also use env variable KAFKA_HOSTS). Takes precedence over --kafka-host and --kafka-port or their envvar variants.", ) parser.add_argument( "--kafka-port", From 343b185f1ea7292cdeb056275ba70cdd7d9ce81c Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Thu, 15 Feb 2024 20:31:39 +0100 Subject: [PATCH 15/31] fix(kafka producer init): Handle missing args.dry_run without throwing an exception --- opl/kafka_init.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 9a647a4..5c7329c 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -1,4 +1,4 @@ -import kafka +from kafka import KafkaProducer, KafkaConsumer # from . import status_data import logging @@ -7,12 +7,14 @@ def kafka_bootstrap(args): - if args.kafka_bootstrap: + try: return args.kafka_bootstrap - if args.kafka_hosts != None and args.kafka_hosts != "": - return args.kafka_hosts.split(",") - else: - return f"{args.kafka_host}:{args.kafka_port}" + except AttributeError: + try: + if args.kafka_hosts != "": + return args.kafka_hosts.split(",") + except AttributeError: + return f"{args.kafka_host}:{args.kafka_port}" # Based on the args, obtain KafkaProducer instance @@ -23,7 +25,7 @@ def get_producer(args, status_data=None): if args.kafka_acks != "all": args.kafka_acks = int(args.kafka_acks) - if args.dry_run: + if hasattr(args, "dry_run") and args.dry_run: logging.info(f"NOT creating a producer as this is a dry run") producer = None else: From 6157a06fc3bc9acd01e2de9152d00f1ccab78a05 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 10:48:38 +0100 Subject: [PATCH 16/31] fix(kafka init Consumer): Fix bootstrap server path --- opl/kafka_init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 5c7329c..e0b2dfb 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -72,7 +72,7 @@ def get_consumer(args, status_data=None): # Common parameters for both cases common_params = { - "bootstrap_servers": args.bootstrap_servers, + "bootstrap_servers": bootstrap_servers, "auto_offset_reset": args.auto_offset_reset or "latest", "enable_auto_commit": args.enable_auto_commit or False, "max_poll_records": args.max_poll_records or 50, From 595e6e56c69a6979ec0e07147fafdbfea4790b86 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 12:52:13 +0100 Subject: [PATCH 17/31] fix(kafka_init bootstrap address): default return to the host:port value to handle both attributeerror and empty string of kafka_hosts --- opl/kafka_init.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index e0b2dfb..cfd33de 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -14,7 +14,8 @@ def kafka_bootstrap(args): if args.kafka_hosts != "": return args.kafka_hosts.split(",") except AttributeError: - return f"{args.kafka_host}:{args.kafka_port}" + pass + return f"{args.kafka_host}:{args.kafka_port}" # Based on the args, obtain KafkaProducer instance From f957e6659e456f6dfde71563f88f495ba32f74f7 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 13:27:55 +0100 Subject: [PATCH 18/31] fix(consumer_lag): Missing self in constructor args --- opl/consumer_lag.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/opl/consumer_lag.py b/opl/consumer_lag.py index 57ef470..503acef 100644 --- a/opl/consumer_lag.py +++ b/opl/consumer_lag.py @@ -13,7 +13,7 @@ class ConsumerLag: bootstrap_server and kafka group as input. """ - def __init__(args, kafka_topic) -> None: + def __init__(self, args, kafka_topic) -> None: self.args = args self.args.kafka_topic = kafka_topic self.logger = logging.getLogger("consumer_lag") From 3b6c2e515e17dfbf762696f9221a28d5e1ace3d3 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 13:28:24 +0100 Subject: [PATCH 19/31] lint fixes --- opl/hbi_utils.py | 2 -- opl/kafka_init.py | 8 ++++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/opl/hbi_utils.py b/opl/hbi_utils.py index 26b9eb5..e42338e 100644 --- a/opl/hbi_utils.py +++ b/opl/hbi_utils.py @@ -4,8 +4,6 @@ import threading import time -import kafka - import opl.args import opl.db import opl.generators.inventory_ingress diff --git a/opl/kafka_init.py b/opl/kafka_init.py index cfd33de..02de826 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -3,7 +3,7 @@ # from . import status_data import logging -### Common instantiators for KafkaProducer and KafkaConsumer +## Common instantiators for KafkaProducer and KafkaConsumer def kafka_bootstrap(args): @@ -27,7 +27,7 @@ def get_producer(args, status_data=None): args.kafka_acks = int(args.kafka_acks) if hasattr(args, "dry_run") and args.dry_run: - logging.info(f"NOT creating a producer as this is a dry run") + logging.info("NOT creating a producer as this is a dry run") producer = None else: common_params = { @@ -57,7 +57,7 @@ def get_producer(args, status_data=None): logging.info(f"Creating passwordless producer to {bootstrap_servers}") producer = KafkaProducer(**common_params) - if status_data != None: + if status_data is not None: status_data.set("parameters.kafka.bootstrap", bootstrap_servers) status_data.set("parameters.kafka.group", args.kafka_group) status_data.set("parameters.kafka.topic", args.kafka_topic) @@ -102,7 +102,7 @@ def get_consumer(args, status_data=None): ) consumer = KafkaConsumer(**common_params) - if status_data != None: + if status_data is not None: status_data.set("parameters.kafka.bootstrap", bootstrap_servers) status_data.set("parameters.kafka.group", args.kafka_group) status_data.set("parameters.kafka.topic", args.kafka_topic or "") From ce08590d11574f8c313c6dbb3d8df4216b2f90f5 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 13:42:59 +0100 Subject: [PATCH 20/31] refactor(kafka args for init): unify args naming for kafka to prefix `kafka_` --- opl/consumer_lag.py | 8 ++++---- opl/get_kafka_times.py | 4 ++-- opl/kafka_init.py | 12 ++++++------ opl/skip_to_end.py | 2 +- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/opl/consumer_lag.py b/opl/consumer_lag.py index 503acef..4e3e2d5 100644 --- a/opl/consumer_lag.py +++ b/opl/consumer_lag.py @@ -20,10 +20,10 @@ def __init__(self, args, kafka_topic) -> None: self.offset_records = {} def _getconsumer(self): - self.args.max_poll_records = 50 - self.args.max_poll_interval_ms = 300000 - self.args.session_timeout_ms = 50000 - self.args.heartbeat_interval_ms = 10000 + self.args.kafka_max_poll_records = 50 + self.args.kafka_max_poll_interval_ms = 300000 + self.args.kafka_session_timeout_ms = 50000 + self.args.kafka_heartbeat_interval_ms = 10000 self.args.kafka_timeout = 100000 return kafka_init.get_consumer(self.args) diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index e8dcc64..b7be76a 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -74,8 +74,8 @@ def kafka_ts2dt(self, timestamp): ) def create_consumer(self): - self.args.auto_offset_reset = "earliest" - self.args.enable_auto_commit = True + self.args.kafka_auto_offset_reset = "earliest" + self.args.kafka_enable_auto_commit = True return kafka_init.get_consumer(self.args, self.status_data) def store_now(self): diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 02de826..6b90dfc 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -74,13 +74,13 @@ def get_consumer(args, status_data=None): # Common parameters for both cases common_params = { "bootstrap_servers": bootstrap_servers, - "auto_offset_reset": args.auto_offset_reset or "latest", - "enable_auto_commit": args.enable_auto_commit or False, - "max_poll_records": args.max_poll_records or 50, - "max_poll_interval_ms": args.max_poll_interval_ms or 300000, + "auto_offset_reset": args.kafka_auto_offset_reset or "latest", + "enable_auto_commit": args.kafka_enable_auto_commit or False, + "max_poll_records": args.kafka_max_poll_records or 50, + "max_poll_interval_ms": args.kafka_max_poll_interval_ms or 300000, "group_id": args.kafka_group, - "session_timeout_ms": args.session_timeout_ms or 50000, - "heartbeat_interval_ms": args.heartbeat_interval_ms or 10000, + "session_timeout_ms": args.kafka_session_timeout_ms or 50000, + "heartbeat_interval_ms": args.kafka_heartbeat_interval_ms or 10000, "consumer_timeout_ms": args.kafka_timeout or 100000, } diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 8de50dd..81ed4e8 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -20,7 +20,7 @@ def doit_seek_to_end(args): on multiple pods. """ - args.enable_auto_commit = True + args.kafka_enable_auto_commit = True consumer = kafka_init.get_consumer(args) # Seek to end From f25d937f672c5fcb09bda89929af4b006dd41852 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 15:18:53 +0100 Subject: [PATCH 21/31] fix(get_kafka_times): Add missing `kafka_max_poll_records` as 100 --- opl/get_kafka_times.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index b7be76a..de277d1 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -30,6 +30,7 @@ def __init__(self, args, status_data, custom_methods): self.status_data = status_data self.args = args + self.args.kafka_max_poll_records = 100 self.queries_definition = yaml.load( args.tables_definition, Loader=yaml.SafeLoader @@ -134,7 +135,7 @@ def process_messages(self): while True: msg_pack = consumer.poll( timeout_ms=5000, - max_records=self.kafka_max_poll_records, + max_records=self.args.kafka_max_poll_records, update_offsets=True, ) for topic, messages in msg_pack.items(): From bd4648ee5f885ec12e7b3cea43aca99431f1d555 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 15:49:40 +0100 Subject: [PATCH 22/31] lint fix kafka_init : comment with single # --- opl/kafka_init.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 6b90dfc..de1ff84 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -1,10 +1,8 @@ from kafka import KafkaProducer, KafkaConsumer -# from . import status_data import logging -## Common instantiators for KafkaProducer and KafkaConsumer - +# Common instantiators for KafkaProducer and KafkaConsumer def kafka_bootstrap(args): try: From 4478fbfc2a8facb5f04cb097ad7d41a6365d7245 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Fri, 16 Feb 2024 15:54:18 +0100 Subject: [PATCH 23/31] fix lint blank line --- opl/kafka_init.py | 1 + 1 file changed, 1 insertion(+) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index de1ff84..10e3651 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -4,6 +4,7 @@ # Common instantiators for KafkaProducer and KafkaConsumer + def kafka_bootstrap(args): try: return args.kafka_bootstrap From 7948848fd850689906484c958ce79d098b41ad50 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Mon, 19 Feb 2024 16:38:55 +0100 Subject: [PATCH 24/31] fix(kafka_init): Default to a value without throwin an exception --- opl/kafka_init.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 10e3651..6db31b3 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -31,14 +31,14 @@ def get_producer(args, status_data=None): else: common_params = { "bootstrap_servers": bootstrap_servers, - "acks": args.kafka_acks, - "request_timeout_ms": args.kafka_request_timeout_ms or 30000, - "max_block_ms": args.kafka_max_block_ms or 60000, - "linger_ms": args.kafka_linger_ms or 0, - "compression_type": args.kafka_compression_type or None, - "batch_size": args.kafka_batch_size or 16384, - "buffer_memory": args.kafka_buffer_memory or 33554432, - "retries": args.kafka_retries or 0, + "acks": getattr(args, "kafka_acks", None), + "request_timeout_ms": getattr(args, "kafka_request_timeout_ms", 30000), + "max_block_ms": getattr(args, "kafka_max_block_ms", 60000), + "linger_ms": getattr(args, "kafka_linger_ms", 0), + "compression_type": getattr(args, "kafka_compression_type", None), + "batch_size": getattr(args, "kafka_batch_size", 16384), + "buffer_memory": getattr(args, "kafka_buffer_memory", 33554432), + "retries": getattr(args, "kafka_retries", 0), } if args.kafka_username != "" and args.kafka_password != "": @@ -73,14 +73,14 @@ def get_consumer(args, status_data=None): # Common parameters for both cases common_params = { "bootstrap_servers": bootstrap_servers, - "auto_offset_reset": args.kafka_auto_offset_reset or "latest", - "enable_auto_commit": args.kafka_enable_auto_commit or False, - "max_poll_records": args.kafka_max_poll_records or 50, - "max_poll_interval_ms": args.kafka_max_poll_interval_ms or 300000, - "group_id": args.kafka_group, - "session_timeout_ms": args.kafka_session_timeout_ms or 50000, - "heartbeat_interval_ms": args.kafka_heartbeat_interval_ms or 10000, - "consumer_timeout_ms": args.kafka_timeout or 100000, + "auto_offset_reset": getattr(args, "kafka_auto_offset_reset", "latest"), + "enable_auto_commit": getattr(args, "kafka_enable_auto_commit", False), + "max_poll_records": getattr(args, "kafka_max_poll_records", 50), + "max_poll_interval_ms": getattr(args, "kafka_max_poll_interval_ms", 300000), + "group_id": getattr(args, "kafka_group", None), + "session_timeout_ms": getattr(args, "kafka_session_timeout_ms", 50000), + "heartbeat_interval_ms": getattr(args, "kafka_heartbeat_interval_ms", 10000), + "consumer_timeout_ms": getattr(args, "kafka_timeout", 100000), } # Kafka consumer creation: SASL or noauth From 790d815bafc63bf31057152acc67094e073d1362 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Mon, 19 Feb 2024 18:10:39 +0100 Subject: [PATCH 25/31] fix(Kafka init): fix username and password check path and loggging args path --- opl/kafka_init.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 6db31b3..6ab7dcf 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -84,20 +84,20 @@ def get_consumer(args, status_data=None): } # Kafka consumer creation: SASL or noauth - if args.username != "" and args.password != "": + if args.kafka_username != "" and args.kafka_password != "": logging.info( - f"Creating SASL password-protected Kafka consumer for {args.bootstrap_servers} in group {args.kafka_group} with timeout {args.session_timeout_ms or 50000} ms" + f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {args.kafka_group} with timeout {args.session_timeout_ms or 50000} ms" ) sasl_params = { "security_protocol": "SASL_SSL", "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": args.username, - "sasl_plain_password": args.password, + "sasl_plain_username": args.kafka_username, + "sasl_plain_password": args.kafka_password, } consumer = KafkaConsumer(**common_params, **sasl_params) else: logging.info( - f"Creating passwordless Kafka consumer for {args.bootstrap_servers} in group {args.kafka_group} with timeout {common_params['session_timeout_ms']} ms" + f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {args.kafka_group} with timeout {args.kafka_session_timeout_ms} ms" ) consumer = KafkaConsumer(**common_params) From 4d0eb40560101c84f664f4407064e30440f194b2 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Mon, 19 Feb 2024 18:38:24 +0100 Subject: [PATCH 26/31] fix(Kafka init): obtain logging params from sanitized initialization params --- opl/kafka_init.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 6ab7dcf..bc675f0 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -86,7 +86,7 @@ def get_consumer(args, status_data=None): # Kafka consumer creation: SASL or noauth if args.kafka_username != "" and args.kafka_password != "": logging.info( - f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {args.kafka_group} with timeout {args.session_timeout_ms or 50000} ms" + f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" ) sasl_params = { "security_protocol": "SASL_SSL", @@ -97,7 +97,7 @@ def get_consumer(args, status_data=None): consumer = KafkaConsumer(**common_params, **sasl_params) else: logging.info( - f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {args.kafka_group} with timeout {args.kafka_session_timeout_ms} ms" + f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" ) consumer = KafkaConsumer(**common_params) From ea0f217b8576fe2a4daa11d1e5bdb7c4491ecdc2 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Mon, 19 Feb 2024 20:52:23 +0100 Subject: [PATCH 27/31] feat(kafka_init): Wrap in same-named class for outside use --- opl/kafka_init.py | 168 +++++++++++++++++++++++----------------------- 1 file changed, 85 insertions(+), 83 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index bc675f0..a9b7567 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -5,45 +5,90 @@ # Common instantiators for KafkaProducer and KafkaConsumer -def kafka_bootstrap(args): - try: - return args.kafka_bootstrap - except AttributeError: +class kafka_init: + + def kafka_bootstrap(args): try: - if args.kafka_hosts != "": - return args.kafka_hosts.split(",") + return args.kafka_bootstrap except AttributeError: - pass - return f"{args.kafka_host}:{args.kafka_port}" - - -# Based on the args, obtain KafkaProducer instance -def get_producer(args, status_data=None): - bootstrap_servers = kafka_bootstrap(args) - - # Sanitize acks setting - if args.kafka_acks != "all": - args.kafka_acks = int(args.kafka_acks) + try: + if args.kafka_hosts != "": + return args.kafka_hosts.split(",") + except AttributeError: + pass + return f"{args.kafka_host}:{args.kafka_port}" + + # Based on the args, obtain KafkaProducer instance + def get_producer(args, status_data=None): + bootstrap_servers = kafka_bootstrap(args) + + # Sanitize acks setting + if args.kafka_acks != "all": + args.kafka_acks = int(args.kafka_acks) + + if hasattr(args, "dry_run") and args.dry_run: + logging.info("NOT creating a producer as this is a dry run") + producer = None + else: + common_params = { + "bootstrap_servers": bootstrap_servers, + "acks": getattr(args, "kafka_acks", None), + "request_timeout_ms": getattr(args, "kafka_request_timeout_ms", 30000), + "max_block_ms": getattr(args, "kafka_max_block_ms", 60000), + "linger_ms": getattr(args, "kafka_linger_ms", 0), + "compression_type": getattr(args, "kafka_compression_type", None), + "batch_size": getattr(args, "kafka_batch_size", 16384), + "buffer_memory": getattr(args, "kafka_buffer_memory", 33554432), + "retries": getattr(args, "kafka_retries", 0), + } - if hasattr(args, "dry_run") and args.dry_run: - logging.info("NOT creating a producer as this is a dry run") - producer = None - else: + if args.kafka_username != "" and args.kafka_password != "": + logging.info( + f"Creating SASL password-protected producer to {bootstrap_servers}" + ) + sasl_params = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": args.kafka_username, + "sasl_plain_password": args.kafka_password, + } + producer = KafkaProducer(**common_params, **sasl_params) + else: + logging.info(f"Creating passwordless producer to {bootstrap_servers}") + producer = KafkaProducer(**common_params) + + if status_data is not None: + status_data.set("parameters.kafka.bootstrap", bootstrap_servers) + status_data.set("parameters.kafka.group", args.kafka_group) + status_data.set("parameters.kafka.topic", args.kafka_topic) + status_data.set("parameters.kafka.timeout", args.kafka_timeout) + + return producer + + # Based on the args, obtain KafkaConsumer instance. + # If args.kafka_topic is supplied, subscribe to the topic. + def get_consumer(args, status_data=None): + bootstrap_servers = kafka_bootstrap(args) + + # Common parameters for both cases common_params = { "bootstrap_servers": bootstrap_servers, - "acks": getattr(args, "kafka_acks", None), - "request_timeout_ms": getattr(args, "kafka_request_timeout_ms", 30000), - "max_block_ms": getattr(args, "kafka_max_block_ms", 60000), - "linger_ms": getattr(args, "kafka_linger_ms", 0), - "compression_type": getattr(args, "kafka_compression_type", None), - "batch_size": getattr(args, "kafka_batch_size", 16384), - "buffer_memory": getattr(args, "kafka_buffer_memory", 33554432), - "retries": getattr(args, "kafka_retries", 0), + "auto_offset_reset": getattr(args, "kafka_auto_offset_reset", "latest"), + "enable_auto_commit": getattr(args, "kafka_enable_auto_commit", False), + "max_poll_records": getattr(args, "kafka_max_poll_records", 50), + "max_poll_interval_ms": getattr(args, "kafka_max_poll_interval_ms", 300000), + "group_id": getattr(args, "kafka_group", None), + "session_timeout_ms": getattr(args, "kafka_session_timeout_ms", 50000), + "heartbeat_interval_ms": getattr( + args, "kafka_heartbeat_interval_ms", 10000 + ), + "consumer_timeout_ms": getattr(args, "kafka_timeout", 100000), } + # Kafka consumer creation: SASL or noauth if args.kafka_username != "" and args.kafka_password != "": logging.info( - f"Creating SASL password-protected producer to {bootstrap_servers}" + f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" ) sasl_params = { "security_protocol": "SASL_SSL", @@ -51,63 +96,20 @@ def get_producer(args, status_data=None): "sasl_plain_username": args.kafka_username, "sasl_plain_password": args.kafka_password, } - producer = KafkaProducer(**common_params, **sasl_params) + consumer = KafkaConsumer(**common_params, **sasl_params) else: - logging.info(f"Creating passwordless producer to {bootstrap_servers}") - producer = KafkaProducer(**common_params) + logging.info( + f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" + ) + consumer = KafkaConsumer(**common_params) if status_data is not None: status_data.set("parameters.kafka.bootstrap", bootstrap_servers) status_data.set("parameters.kafka.group", args.kafka_group) - status_data.set("parameters.kafka.topic", args.kafka_topic) + status_data.set("parameters.kafka.topic", args.kafka_topic or "") status_data.set("parameters.kafka.timeout", args.kafka_timeout) - return producer - - -# Based on the args, obtain KafkaConsumer instance. -# If args.kafka_topic is supplied, subscribe to the topic. -def get_consumer(args, status_data=None): - bootstrap_servers = kafka_bootstrap(args) - - # Common parameters for both cases - common_params = { - "bootstrap_servers": bootstrap_servers, - "auto_offset_reset": getattr(args, "kafka_auto_offset_reset", "latest"), - "enable_auto_commit": getattr(args, "kafka_enable_auto_commit", False), - "max_poll_records": getattr(args, "kafka_max_poll_records", 50), - "max_poll_interval_ms": getattr(args, "kafka_max_poll_interval_ms", 300000), - "group_id": getattr(args, "kafka_group", None), - "session_timeout_ms": getattr(args, "kafka_session_timeout_ms", 50000), - "heartbeat_interval_ms": getattr(args, "kafka_heartbeat_interval_ms", 10000), - "consumer_timeout_ms": getattr(args, "kafka_timeout", 100000), - } - - # Kafka consumer creation: SASL or noauth - if args.kafka_username != "" and args.kafka_password != "": - logging.info( - f"Creating SASL password-protected Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" - ) - sasl_params = { - "security_protocol": "SASL_SSL", - "sasl_mechanism": "SCRAM-SHA-512", - "sasl_plain_username": args.kafka_username, - "sasl_plain_password": args.kafka_password, - } - consumer = KafkaConsumer(**common_params, **sasl_params) - else: - logging.info( - f"Creating passwordless Kafka consumer for {bootstrap_servers} in group {common_params['group_id']} with timeout {common_params['session_timeout_ms']} ms" - ) - consumer = KafkaConsumer(**common_params) - - if status_data is not None: - status_data.set("parameters.kafka.bootstrap", bootstrap_servers) - status_data.set("parameters.kafka.group", args.kafka_group) - status_data.set("parameters.kafka.topic", args.kafka_topic or "") - status_data.set("parameters.kafka.timeout", args.kafka_timeout) - - if args.kafka_topic and args.kafka_topic != "": - consumer.subscribe([args.kafka_topic]) - - return consumer + if args.kafka_topic and args.kafka_topic != "": + consumer.subscribe([args.kafka_topic]) + + return consumer From 9a3221764b987f56bfece0ffe0bafbcbf906e8c5 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Mon, 19 Feb 2024 22:44:27 +0100 Subject: [PATCH 28/31] fix(msk): Fix path to kafka_init due to added class --- opl/get_kafka_times.py | 2 +- opl/hbi_utils.py | 2 +- opl/post_kafka_times.py | 2 +- opl/skip_to_end.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index de277d1..0ef02b3 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -9,7 +9,7 @@ import opl.data import opl.db import opl.skelet -import opl.kafka_init as kafka_init +from opl.kafka_init import kafka_init import psycopg2 import psycopg2.extras diff --git a/opl/hbi_utils.py b/opl/hbi_utils.py index e42338e..a494417 100644 --- a/opl/hbi_utils.py +++ b/opl/hbi_utils.py @@ -8,7 +8,7 @@ import opl.db import opl.generators.inventory_ingress import opl.skelet -import opl.kafka_init as kafka_init +from opl.kafka_init import kafka_init import psycopg2 diff --git a/opl/post_kafka_times.py b/opl/post_kafka_times.py index 7360986..d5fb05e 100644 --- a/opl/post_kafka_times.py +++ b/opl/post_kafka_times.py @@ -11,7 +11,7 @@ import opl.data import opl.db import opl.skelet -import opl.kafka_init as kafka_init +from opl.kafka_init import kafka_init import psycopg2 import psycopg2.extras diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 81ed4e8..cedde49 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -5,7 +5,7 @@ import os import time -from . import kafka_init +from .kafka_init import kafka_init from . import args from . import skelet From afc51ae98e5ae85d6db3e39b5dccf16e9fadcc23 Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Tue, 20 Feb 2024 15:17:21 +0100 Subject: [PATCH 29/31] fix(kafka_init): Fix reference to internal class method, make it static --- opl/kafka_init.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index a9b7567..8f864b5 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -7,6 +7,7 @@ class kafka_init: + @staticmethod def kafka_bootstrap(args): try: return args.kafka_bootstrap @@ -20,7 +21,7 @@ def kafka_bootstrap(args): # Based on the args, obtain KafkaProducer instance def get_producer(args, status_data=None): - bootstrap_servers = kafka_bootstrap(args) + bootstrap_servers = kafka_init.kafka_bootstrap(args) # Sanitize acks setting if args.kafka_acks != "all": @@ -68,7 +69,7 @@ def get_producer(args, status_data=None): # Based on the args, obtain KafkaConsumer instance. # If args.kafka_topic is supplied, subscribe to the topic. def get_consumer(args, status_data=None): - bootstrap_servers = kafka_bootstrap(args) + bootstrap_servers = kafka_init.kafka_bootstrap(args) # Common parameters for both cases common_params = { From 53c32d5482d6a981b1c00afa5fbb86750977e53d Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Tue, 20 Feb 2024 16:07:43 +0100 Subject: [PATCH 30/31] feat(kafka init): make producer&consumer getters static --- opl/kafka_init.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 8f864b5..70f98b3 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -20,6 +20,7 @@ def kafka_bootstrap(args): return f"{args.kafka_host}:{args.kafka_port}" # Based on the args, obtain KafkaProducer instance + @staticmethod def get_producer(args, status_data=None): bootstrap_servers = kafka_init.kafka_bootstrap(args) @@ -68,6 +69,7 @@ def get_producer(args, status_data=None): # Based on the args, obtain KafkaConsumer instance. # If args.kafka_topic is supplied, subscribe to the topic. + @staticmethod def get_consumer(args, status_data=None): bootstrap_servers = kafka_init.kafka_bootstrap(args) From 5c5fa3c8658448624c1d1b01bf0e4a87011609da Mon Sep 17 00:00:00 2001 From: Jan Smejkal Date: Tue, 20 Feb 2024 16:25:13 +0100 Subject: [PATCH 31/31] lint fix: Remove leading newline, fix import --- opl/consumer_lag.py | 3 +-- opl/kafka_init.py | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/opl/consumer_lag.py b/opl/consumer_lag.py index 4e3e2d5..85885b7 100644 --- a/opl/consumer_lag.py +++ b/opl/consumer_lag.py @@ -3,8 +3,7 @@ import logging import time from kafka import TopicPartition - -import opl.kafka_init as kafka_init +from opl.kafka_init import kafka_init class ConsumerLag: diff --git a/opl/kafka_init.py b/opl/kafka_init.py index 70f98b3..532e834 100644 --- a/opl/kafka_init.py +++ b/opl/kafka_init.py @@ -6,7 +6,6 @@ class kafka_init: - @staticmethod def kafka_bootstrap(args): try: