From 3845f7e326fe6d08e8e855d7434d0a54a86ccb97 Mon Sep 17 00:00:00 2001 From: Anton Kuraev Date: Tue, 4 Jun 2024 23:29:54 -0700 Subject: [PATCH] Kafka consumer reliability/performance improvements --- .../src/datahub_actions/pipeline/pipeline.py | 28 ++++++--- .../plugin/source/kafka/kafka_event_source.py | 60 ++++++++++++++++++- .../plugin/source/kafka/utils.py | 26 ++++++++ .../datahub_actions/source/event_source.py | 2 +- datahub-actions/tests/unit/test_helpers.py | 4 +- gradle/wrapper/gradle-wrapper.properties | 2 +- 6 files changed, 106 insertions(+), 16 deletions(-) create mode 100644 datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py diff --git a/datahub-actions/src/datahub_actions/pipeline/pipeline.py b/datahub-actions/src/datahub_actions/pipeline/pipeline.py index 8c81e72b..d4f78097 100644 --- a/datahub-actions/src/datahub_actions/pipeline/pipeline.py +++ b/datahub-actions/src/datahub_actions/pipeline/pipeline.py @@ -169,9 +169,15 @@ def run(self) -> None: enveloped_events = self.source.events() for enveloped_event in enveloped_events: # Then, process the event. - self._process_event(enveloped_event) + retval = self._process_event(enveloped_event) + + # For legacy users w/o selective ack support, convert + # None to True, i.e. always commit. + if retval is None: + retval = True + # Finally, ack the event. - self._ack_event(enveloped_event) + self._ack_event(enveloped_event, retval) def stop(self) -> None: """ @@ -189,10 +195,11 @@ def stats(self) -> PipelineStats: """ return self._stats - def _process_event(self, enveloped_event: EventEnvelope) -> None: + def _process_event(self, enveloped_event: EventEnvelope) -> Optional[bool]: # Attempt to process the incoming event, with retry. curr_attempt = 1 max_attempts = self._retry_count + 1 + retval = None while curr_attempt <= max_attempts: try: # First, transform the event. @@ -200,10 +207,10 @@ def _process_event(self, enveloped_event: EventEnvelope) -> None: # Then, invoke the action if the event is non-null. if transformed_event is not None: - self._execute_action(transformed_event) + retval = self._execute_action(transformed_event) # Short circuit - processing has succeeded. - return + return retval except Exception: logger.exception( f"Caught exception while attempting to process event. Attempt {curr_attempt}/{max_attempts} event type: {enveloped_event.event_type}, pipeline name: {self.name}" @@ -220,6 +227,8 @@ def _process_event(self, enveloped_event: EventEnvelope) -> None: # Finally, handle the failure self._handle_failure(enveloped_event) + return retval + def _execute_transformers( self, enveloped_event: EventEnvelope ) -> Optional[EventEnvelope]: @@ -254,19 +263,20 @@ def _execute_transformer( f"Caught exception while executing Transformer with name {type(transformer).__name__}" ) from e - def _execute_action(self, enveloped_event: EventEnvelope) -> None: + def _execute_action(self, enveloped_event: EventEnvelope) -> Optional[bool]: try: - self.action.act(enveloped_event) + retval = self.action.act(enveloped_event) self._stats.increment_action_success_count() + return retval except Exception as e: self._stats.increment_action_exception_count() raise PipelineException( f"Caught exception while executing Action with type {type(self.action).__name__}" ) from e - def _ack_event(self, enveloped_event: EventEnvelope) -> None: + def _ack_event(self, enveloped_event: EventEnvelope, processed: bool) -> None: try: - self.source.ack(enveloped_event) + self.source.ack(enveloped_event, processed) self._stats.increment_success_count() except Exception: self._stats.increment_failed_ack_count() diff --git a/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py b/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py index 6e1d6624..3a85b5af 100644 --- a/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py +++ b/datahub-actions/src/datahub_actions/plugin/source/kafka/kafka_event_source.py @@ -39,6 +39,7 @@ # May or may not need these. from datahub_actions.pipeline.pipeline_context import PipelineContext +from datahub_actions.plugin.source.kafka.utils import with_retry from datahub_actions.source.event_source import EventSource logger = logging.getLogger(__name__) @@ -90,6 +91,10 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent class KafkaEventSourceConfig(ConfigModel): connection: KafkaConsumerConnectionConfig = KafkaConsumerConnectionConfig() topic_routes: Optional[Dict[str, str]] + async_commit_enabled: bool = False + async_commit_interval: int = 10000 + commit_retry_count: int = 5 + commit_retry_backoff: float = 10.0 def kafka_messages_observer(pipeline_name: str) -> Callable: @@ -120,6 +125,16 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext): schema_client_config = config.connection.schema_registry_config.copy() schema_client_config["url"] = self.source_config.connection.schema_registry_url self.schema_registry_client = SchemaRegistryClient(schema_client_config) + + async_commit_config: Dict[str, Any] = {} + if self.source_config.async_commit_enabled: + # See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit + async_commit_config["enable.auto.offset.store"] = False + async_commit_config["enable.auto.commit"] = True + async_commit_config["auto.commit.interval.ms"] = ( + self.source_config.async_commit_interval + ) + self.consumer: confluent_kafka.Consumer = confluent_kafka.DeserializingConsumer( { # Provide a custom group id to subcribe to multiple partitions via separate actions pods. @@ -134,6 +149,7 @@ def __init__(self, config: KafkaEventSourceConfig, ctx: PipelineContext): "session.timeout.ms": "10000", # 10s timeout. "max.poll.interval.ms": "10000", # 10s poll max. **self.source_config.connection.consumer_config, + **async_commit_config, } ) self._observe_message: Callable = kafka_messages_observer(ctx.pipeline_name) @@ -201,16 +217,54 @@ def close(self) -> None: self.running = False self.consumer.close() - def ack(self, event: EventEnvelope) -> None: - self.consumer.commit( + def _commit_offsets(self, event: EventEnvelope) -> None: + retval = self.consumer.commit( + asynchronous=False, offsets=[ TopicPartition( event.meta["kafka"]["topic"], event.meta["kafka"]["partition"], event.meta["kafka"]["offset"] + 1, ) - ] + ], ) + if retval is None: + logger.exception( + f"Unexpected response when commiting offset to kafka: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}" + ) + return + for partition in retval: + if partition.error is not None: + raise KafkaException( + f"Failed to commit offest for topic: {partition.topic}, partition: {partition.partition}, offset: {partition.offset}: {partition.error.str()}" + ) logger.debug( f"Successfully committed offsets at message: topic: {event.meta['kafka']['topic']}, partition: {event.meta['kafka']['partition']}, offset: {event.meta['kafka']['offset']}" ) + + def _store_offsets(self, event: EventEnvelope) -> None: + self.consumer.store_offsets( + offsets=[ + TopicPartition( + event.meta["kafka"]["topic"], + event.meta["kafka"]["partition"], + event.meta["kafka"]["offset"] + 1, + ) + ], + ) + + def ack(self, event: EventEnvelope, processed: bool = True) -> None: + # See for details: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#auto-offset-commit + + if processed or not self.source_config.async_commit_enabled: + # Immediately commit if the message was processed by the upstream, + # or delayed commit is disabled + with_retry( + self.source_config.commit_retry_count, + self.source_config.commit_retry_backoff, + self._commit_offsets, + event, + ) + else: + # Otherwise store offset for periodic autocommit + self._store_offsets(event) diff --git a/datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py b/datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py new file mode 100644 index 00000000..a416d519 --- /dev/null +++ b/datahub-actions/src/datahub_actions/plugin/source/kafka/utils.py @@ -0,0 +1,26 @@ +import logging +import time +from typing import Any, Callable + +logger = logging.getLogger(__name__) + + +def with_retry(max_attempts: int, max_backoff: float, func: Callable, *args, **kwargs) -> Any: # type: ignore + curr_attempt = 0 + backoff = 0.3 + + while curr_attempt < max_attempts: + try: + return func(*args, **kwargs) + except Exception as e: + logger.error(str(e)) + + curr_attempt = curr_attempt + 1 + if curr_attempt >= max_attempts: + logger.warning("kafka event source: exhausted all attempts.") + return + + backoff = backoff * 2 + if backoff > max_backoff: + backoff = max_backoff + time.sleep(backoff) diff --git a/datahub-actions/src/datahub_actions/source/event_source.py b/datahub-actions/src/datahub_actions/source/event_source.py index bcbbdb43..91c3163b 100644 --- a/datahub-actions/src/datahub_actions/source/event_source.py +++ b/datahub-actions/src/datahub_actions/source/event_source.py @@ -48,7 +48,7 @@ def events(self) -> Iterable[EventEnvelope]: """ @abstractmethod - def ack(self, event: EventEnvelope) -> None: + def ack(self, event: EventEnvelope, processed: bool = True) -> None: """ Acknowledges the processing of an individual event by the Actions Framework """ diff --git a/datahub-actions/tests/unit/test_helpers.py b/datahub-actions/tests/unit/test_helpers.py index e06106de..eeb1c68c 100644 --- a/datahub-actions/tests/unit/test_helpers.py +++ b/datahub-actions/tests/unit/test_helpers.py @@ -138,7 +138,7 @@ def events(self) -> Iterable[EventEnvelope]: EventEnvelope("TestEvent", TestEvent("value"), {}), ] - def ack(self, event: EventEnvelope) -> None: + def ack(self, event: EventEnvelope, processed: bool = True) -> None: self.ack_count = self.ack_count + 1 def close(self) -> None: @@ -221,7 +221,7 @@ def events(self) -> Iterable[EventEnvelope]: "MetadataChangeLogEvent_v1", metadata_change_log_event, {} ) - def ack(self, event: EventEnvelope) -> None: + def ack(self, event: EventEnvelope, processed: bool = True) -> None: pass def close(self) -> None: diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 5028f28f..e1bef7e8 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-5.6.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.0.2-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists