diff --git a/opl/get_kafka_times.py b/opl/get_kafka_times.py index bf1e82c..1ec702a 100644 --- a/opl/get_kafka_times.py +++ b/opl/get_kafka_times.py @@ -85,38 +85,31 @@ def create_consumer(self): self.status_data.set("parameters.kafka.topic", self.kafka_topic) self.status_data.set("parameters.kafka.timeout", self.kafka_timeout) - try: + common_params = { + "bootstrap_servers": self.kafka_host, + "auto_offset_reset": "earliest", + "enable_auto_commit": True, + "group_id": self.kafka_group, + "max_poll_records": self.kafka_max_poll_records, + "session_timeout_ms": 50000, + "heartbeat_interval_ms": 10000, + "consumer_timeout_ms": self.kafka_timeout, + } + + if self.kafka_username != "" and self.kafka_password != "": logging.info( f"Creating consumer with sasl username&pasword to {self.kafka_host}" ) - consumer = KafkaConsumer( - self.kafka_topic, - bootstrap_servers=self.kafka_host, # self.kafka_hosts, - auto_offset_reset="earliest", - enable_auto_commit=True, - group_id=self.kafka_group, - max_poll_records=self.kafka_max_poll_records, - session_timeout_ms=50000, - heartbeat_interval_ms=10000, - consumer_timeout_ms=self.kafka_timeout, - security_protocol="SASL_SSL", - sasl_mechanism="SCRAM-SHA-512", - sasl_plain_username=self.kafka_username, - sasl_plain_password=self.kafka_password, - ) - except AttributeError: + sasl_params = { + "security_protocol": "SASL_SSL", + "sasl_mechanism": "SCRAM-SHA-512", + "sasl_plain_username": self.kafka_username, + "sasl_plain_password": self.kafka_password, + } + consumer = KafkaConsumer([self.kafka_topic], **common_params, **sasl_params) + else: logging.info(f"Creating passwordless consumer to {self.kafka_host}") - consumer = KafkaConsumer( - self.kafka_topic, - bootstrap_servers=self.kafka_host, - auto_offset_reset="earliest", - enable_auto_commit=True, - group_id=self.kafka_group, - max_poll_records=self.kafka_max_poll_records, - session_timeout_ms=50000, - heartbeat_interval_ms=10000, - consumer_timeout_ms=self.kafka_timeout, - ) + consumer = KafkaConsumer([self.kafka_topic], **common_params) return consumer def store_now(self): diff --git a/opl/skip_to_end.py b/opl/skip_to_end.py index 75d3a34..d219f2e 100755 --- a/opl/skip_to_end.py +++ b/opl/skip_to_end.py @@ -11,31 +11,22 @@ from . import skelet -def doit_seek_to_end(kafka_hosts, kafka_timeout, kafka_topic, kafka_group, args=""): - """ - Create consumer and seek to end - - This seek to end is important so we are not wasting our time processing - all the messages in the Kafka log for given topic. If we would have same - and static group name, we would have problems when running concurrently - on multiple pods. - """ - +def create_consumer(args): # Common parameters for both cases common_params = { - "bootstrap_servers": kafka_hosts, + "bootstrap_servers": args.kafka_hosts, "auto_offset_reset": "latest", "enable_auto_commit": True, - "group_id": kafka_group, + "group_id": args.kafka_group, "session_timeout_ms": 50000, "heartbeat_interval_ms": 10000, - "consumer_timeout_ms": kafka_timeout, + "consumer_timeout_ms": args.kafka_timeout, } # Kafka consumer creation: SASL or noauth - try: + if args.kafka_username != "" and args.kafka_password != "": logging.info( - f"Creating SASL password-protected Kafka consumer for {kafka_hosts} in group {kafka_group} with timeout {kafka_timeout} ms topic {kafka_topic}" + f"Creating SASL password-protected Kafka consumer for {args.kafka_hosts} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" ) sasl_params = { "security_protocol": "SASL_SSL", @@ -43,14 +34,26 @@ def doit_seek_to_end(kafka_hosts, kafka_timeout, kafka_topic, kafka_group, args= "sasl_plain_username": args.kafka_username, "sasl_plain_password": args.kafka_password, } - consumer = KafkaConsumer(**common_params, **sasl_params) - except AttributeError: + consumer = KafkaConsumer([args.kafka_topic], **common_params, **sasl_params) + else: logging.info( - f"Creating passwordless producer for for {kafka_hosts} in group {kafka_group} with timeout {kafka_timeout} ms topic {kafka_topic}" + f"Creating passwordless producer for for {args.kafka_hosts} in group {args.kafka_group} with timeout {args.kafka_timeout} ms topic {args.kafka_topic}" ) - consumer = KafkaConsumer(**common_params) + consumer = KafkaConsumer([args.kafka_topic], **common_params) + return consumer - consumer.subscribe(kafka_topic) + +def doit_seek_to_end(args): + """ + Create consumer and seek to end + + This seek to end is important so we are not wasting our time processing + all the messages in the Kafka log for given topic. If we would have same + and static group name, we would have problems when running concurrently + on multiple pods. + """ + + consumer = create_consumer(args) # Seek to end for attempt in range(10): @@ -71,13 +74,7 @@ def doit_seek_to_end(kafka_hosts, kafka_timeout, kafka_topic, kafka_group, args= def doit(args, status_data): - doit_seek_to_end( - [f"{args.kafka_host}:{args.kafka_port}"], - args.kafka_timeout, - args.kafka_topic, - args.kafka_group, - args, - ) + doit_seek_to_end(args) status_data.set("parameters.kafka.seek_topic", args.kafka_topic) status_data.set("parameters.kafka.seek_timeout", args.kafka_timeout)