diff --git a/CHANGES/5850.bugfix b/CHANGES/5850.bugfix new file mode 100644 index 0000000000..069e48d230 --- /dev/null +++ b/CHANGES/5850.bugfix @@ -0,0 +1 @@ +Made Kafka dependencies optional depending on whether the producer is configured. diff --git a/pulpcore/tasking/kafka.py b/pulpcore/tasking/kafka.py index 37cb9643ce..0834f06d81 100644 --- a/pulpcore/tasking/kafka.py +++ b/pulpcore/tasking/kafka.py @@ -4,62 +4,105 @@ from threading import Thread from typing import Optional -from confluent_kafka import Producer + from django.conf import settings -_logger = logging.getLogger(__name__) -_kafka_producer = None + _bootstrap_servers = settings.get("KAFKA_BOOTSTRAP_SERVERS") -_producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT") -_security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL") -_ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM") -_sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM") -_sasl_username = settings.get("KAFKA_SASL_USERNAME") -_sasl_password = settings.get("KAFKA_SASL_PASSWORD") - - -class KafkaProducerPollingWorker: - def __init__(self, kafka_producer): - self._kafka_producer = kafka_producer - self._running = False - self._thread = None - - def start(self): - self._running = True - self._thread = Thread(target=self._run) - self._thread.start() - - def _run(self): - while self._running: - self._kafka_producer.poll(_producer_poll_timeout) - self._kafka_producer.flush() - - def stop(self): - self._running = False - self._thread.join() - - -def get_kafka_producer() -> Optional[Producer]: - global _kafka_producer - if _bootstrap_servers is None: - return None - if _kafka_producer is None: - conf = { - "bootstrap.servers": _bootstrap_servers, - "security.protocol": _security_protocol, - "client.id": socket.gethostname(), - } - optional_conf = { - "ssl.ca.pem": _ssl_ca_pem, - "sasl.mechanisms": _sasl_mechanism, - "sasl.username": _sasl_username, - "sasl.password": _sasl_password, + + +if _bootstrap_servers is None: + + def send_task_notification(task): + pass + +else: + from confluent_kafka import Producer + + # NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols + from cloudevents.http import CloudEvent + from cloudevents.kafka import to_structured + from pulpcore.app.serializers.task import TaskStatusMessageSerializer + + _logger = logging.getLogger(__name__) + _kafka_producer = None + _producer_poll_timeout = settings.get("KAFKA_PRODUCER_POLL_TIMEOUT") + _security_protocol = settings.get("KAFKA_SECURITY_PROTOCOL") + _ssl_ca_pem = settings.get("KAFKA_SSL_CA_PEM") + _sasl_mechanism = settings.get("KAFKA_SASL_MECHANISM") + _sasl_username = settings.get("KAFKA_SASL_USERNAME") + _sasl_password = settings.get("KAFKA_SASL_PASSWORD") + + _kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC") + _kafka_tasks_status_producer_sync_enabled = settings.get( + "KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED" + ) + + class KafkaProducerPollingWorker: + def __init__(self, kafka_producer): + self._kafka_producer = kafka_producer + self._running = False + self._thread = None + + def start(self): + self._running = True + self._thread = Thread(target=self._run) + self._thread.start() + + def _run(self): + while self._running: + self._kafka_producer.poll(_producer_poll_timeout) + self._kafka_producer.flush() + + def stop(self): + self._running = False + self._thread.join() + + def _get_kafka_producer() -> Optional[Producer]: + global _kafka_producer + if _kafka_producer is None: + conf = { + "bootstrap.servers": _bootstrap_servers, + "security.protocol": _security_protocol, + "client.id": socket.gethostname(), + } + optional_conf = { + "ssl.ca.pem": _ssl_ca_pem, + "sasl.mechanisms": _sasl_mechanism, + "sasl.username": _sasl_username, + "sasl.password": _sasl_password, + } + for key, value in optional_conf.items(): + if value: + conf[key] = value + _kafka_producer = Producer(conf, logger=_logger) + polling_worker = KafkaProducerPollingWorker(_kafka_producer) + polling_worker.start() + atexit.register(polling_worker.stop) + return _kafka_producer + + def _report_message_delivery(error, message): + if error is not None: + _logger.error(error) + elif _logger.isEnabledFor(logging.DEBUG): + _logger.debug(f"Message delivery successfully with contents {message.value}") + + def send_task_notification(task): + kafka_producer = _get_kafka_producer() + attributes = { + "type": "pulpcore.tasking.status", + "source": "pulpcore.tasking", + "datacontenttype": "application/json", + "dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml", } - for key, value in optional_conf.items(): - if value: - conf[key] = value - _kafka_producer = Producer(conf, logger=_logger) - polling_worker = KafkaProducerPollingWorker(_kafka_producer) - polling_worker.start() - atexit.register(polling_worker.stop) - return _kafka_producer + data = TaskStatusMessageSerializer(task, context={"request": None}).data + task_message = to_structured(CloudEvent(attributes, data)) + kafka_producer.produce( + topic=_kafka_tasks_status_topic, + value=task_message.value, + key=task_message.key, + headers=task_message.headers, + on_delivery=_report_message_delivery, + ) + if _kafka_tasks_status_producer_sync_enabled: + kafka_producer.flush() diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index 70cdbc392b..550af56f57 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -8,16 +8,11 @@ from datetime import timedelta from gettext import gettext as _ -# NOTE: in spite of the name, cloudevents.http.CloudEvent is appropriate for other protocols -from cloudevents.http import CloudEvent -from cloudevents.kafka import to_structured -from django.conf import settings from django.db import connection, transaction from django.db.models import Model, Max from django_guid import get_guid from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS from pulpcore.app.models import Task, TaskGroup -from pulpcore.app.serializers.task import TaskStatusMessageSerializer from pulpcore.app.util import current_task, get_domain, get_prn from pulpcore.constants import ( TASK_FINAL_STATES, @@ -25,13 +20,10 @@ TASK_STATES, TASK_DISPATCH_LOCK, ) -from pulpcore.tasking.kafka import get_kafka_producer +from pulpcore.tasking.kafka import send_task_notification _logger = logging.getLogger(__name__) -_kafka_tasks_status_topic = settings.get("KAFKA_TASKS_STATUS_TOPIC") -_kafka_tasks_status_producer_sync_enabled = settings.get("KAFKA_TASKS_STATUS_PRODUCER_SYNC_ENABLED") - def _validate_and_get_resources(resources): resource_set = set() @@ -92,11 +84,11 @@ def _execute_task(task): ) ) _logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb)))) - _send_task_notification(task) + send_task_notification(task) else: task.set_completed() _logger.info(_("Task completed %s in domain: %s"), task.pk, domain.name) - _send_task_notification(task) + send_task_notification(task) def dispatch( @@ -305,32 +297,3 @@ def cancel_task_group(task_group_id): except RuntimeError: pass return task_group - - -def _send_task_notification(task): - kafka_producer = get_kafka_producer() - if kafka_producer is not None: - attributes = { - "type": "pulpcore.tasking.status", - "source": "pulpcore.tasking", - "datacontenttype": "application/json", - "dataref": "https://github.com/pulp/pulpcore/blob/main/docs/static/task-status-v1.yaml", - } - data = TaskStatusMessageSerializer(task, context={"request": None}).data - task_message = to_structured(CloudEvent(attributes, data)) - kafka_producer.produce( - topic=_kafka_tasks_status_topic, - value=task_message.value, - key=task_message.key, - headers=task_message.headers, - on_delivery=_report_message_delivery, - ) - if _kafka_tasks_status_producer_sync_enabled: - kafka_producer.flush() - - -def _report_message_delivery(error, message): - if error is not None: - _logger.error(error) - elif _logger.isEnabledFor(logging.DEBUG): - _logger.debug(f"Message delivery successfully with contents {message.value}") diff --git a/requirements.txt b/requirements.txt index 3c2e4b4399..3cf0744e38 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,8 +5,6 @@ asyncio-throttle>=1.0,<=1.0.2 async-timeout>=4.0.3,<4.0.4;python_version<"3.11" backoff>=2.1.2,<2.2.2 click>=8.1.0,<=8.1.7 -cloudevents==1.11.0 # Pinned because project warns "things might (and will) break with every update" -confluent-kafka>=2.4.0,<2.6.0 cryptography>=38.0.1,<43.0.2 Django~=4.2.0 # LTS version, switch only if we have a compelling reason to django-filter>=23.1,<=24.3 diff --git a/setup.py b/setup.py index 27542cd8e4..ed3a166a89 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,11 @@ "google": ["django-storages[google]==1.14.3"], "azure": ["django-storages[azure]==1.14.3"], "prometheus": ["django-prometheus"], + "kafka": [ + # Pinned because project warns "things might (and will) break with every update" + "cloudevents==1.11.0", + "confluent-kafka>=2.4.0,<2.6.0", + ], }, include_package_data=True, classifiers=[