diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 459cdb61c..0c8d66b6d 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -11,7 +11,14 @@ from .backends.v3.schema import ChecksumAlgorithm from .backends.writer import BackupWriter, StdOut from .encoders import encode_key, encode_value -from .errors import BackupError, BackupTopicAlreadyExists, EmptyPartition, PartitionCountError, StaleConsumerError +from .errors import ( + BackupDataRestorationError, + BackupError, + BackupTopicAlreadyExists, + EmptyPartition, + PartitionCountError, + StaleConsumerError, +) from .poll_timeout import PollTimeout from .topic_configurations import ConfigSource, get_topic_configurations from enum import Enum @@ -20,6 +27,7 @@ from kafka.admin import KafkaAdminClient, NewTopic from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import KafkaError, TopicAlreadyExistsError +from kafka.future import Future from kafka.structs import PartitionMetadata, TopicPartition from karapace import constants from karapace.backup.backends.v1 import SchemaBackupV1Reader @@ -255,6 +263,16 @@ def _consumer(config: Config, topic: str) -> Iterator[KafkaConsumer]: yield consumer +@contextlib.contextmanager +def _enable_producer_callback_errors() -> Iterator[None]: + global_value = Future.error_on_callbacks + Future.error_on_callbacks = True + try: + yield None + finally: + Future.error_on_callbacks = global_value + + @contextlib.contextmanager def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]: """Creates an automatically closing Kafka producer client. @@ -264,9 +282,10 @@ def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]: :raises PartitionCountError: if the topic does not have exactly one partition. :raises Exception: if client creation fails, concrete exception types are unknown, see Kafka implementation. """ - with kafka_producer_from_config(config) as producer: - __check_partition_count(topic, producer.partitions_for) - yield producer + with _enable_producer_callback_errors(): + with kafka_producer_from_config(config) as producer: + __check_partition_count(topic, producer.partitions_for) + yield producer def _normalize_location(input_location: str) -> Path | StdOut: @@ -372,7 +391,7 @@ def _handle_restore_topic( def _raise_backup_error(exception: Exception) -> NoReturn: - raise BackupError("Error while producing restored messages") from exception + raise BackupDataRestorationError("Error while producing restored messages") from exception def _handle_producer_send( @@ -384,14 +403,17 @@ def _handle_producer_send( instruction.key, instruction.value, ) - producer.send( - instruction.topic_name, - key=instruction.key, - value=instruction.value, - partition=instruction.partition_index, - headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers], - timestamp_ms=instruction.timestamp, - ).add_errback(_raise_backup_error) + try: + producer.send( + instruction.topic_name, + key=instruction.key, + value=instruction.value, + partition=instruction.partition_index, + headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers], + timestamp_ms=instruction.timestamp, + ).add_errback(_raise_backup_error) + except (KafkaError, AssertionError) as ex: + raise BackupDataRestorationError("Error while calling send on restoring messages") from ex def restore_backup( diff --git a/karapace/backup/cli.py b/karapace/backup/cli.py index ffa6ae14d..8edb431f6 100644 --- a/karapace/backup/cli.py +++ b/karapace/backup/cli.py @@ -7,13 +7,14 @@ from __future__ import annotations from . import api -from .errors import StaleConsumerError +from .errors import BackupDataRestorationError, StaleConsumerError from .poll_timeout import PollTimeout from karapace.backup.api import VerifyLevel from karapace.config import Config, read_config import argparse import sys +import traceback def parse_args() -> argparse.Namespace: @@ -95,12 +96,16 @@ def dispatch(args: argparse.Namespace) -> None: api.verify(api.locate_backup_file(location), level=VerifyLevel(args.level)) elif args.command == "restore": config = get_config(args) - api.restore_backup( - config=config, - backup_location=api.locate_backup_file(location), - topic_name=api.normalize_topic_name(args.topic, config), - skip_topic_creation=args.skip_topic_creation, - ) + try: + api.restore_backup( + config=config, + backup_location=api.locate_backup_file(location), + topic_name=api.normalize_topic_name(args.topic, config), + skip_topic_creation=args.skip_topic_creation, + ) + except BackupDataRestorationError: + traceback.print_exc() + sys.exit(3) elif args.command == "export-anonymized-avro-schemas": config = get_config(args) api.create_backup( diff --git a/karapace/backup/errors.py b/karapace/backup/errors.py index ec739d6ef..364052cc5 100644 --- a/karapace/backup/errors.py +++ b/karapace/backup/errors.py @@ -24,6 +24,10 @@ class BackupTopicAlreadyExists(BackupError): pass +class BackupDataRestorationError(BackupError): + pass + + class StaleConsumerError(BackupError, RuntimeError): """Raised when the backup consumer does not make any progress and has not reached the last record in the topic.""" diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 89277719b..54f700b78 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -8,12 +8,12 @@ from kafka import KafkaAdminClient, KafkaProducer, TopicPartition from kafka.admin import ConfigResource, ConfigResourceType, NewTopic from kafka.consumer.fetcher import ConsumerRecord -from kafka.errors import KafkaTimeoutError, UnknownTopicOrPartitionError +from kafka.errors import UnknownTopicOrPartitionError from karapace.backup import api from karapace.backup.api import _consume_records, TopicName from karapace.backup.backends.v3.readers import read_metadata from karapace.backup.backends.v3.schema import Metadata -from karapace.backup.errors import EmptyPartition +from karapace.backup.errors import BackupDataRestorationError, EmptyPartition from karapace.backup.poll_timeout import PollTimeout from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults @@ -334,6 +334,46 @@ def test_errors_when_omitting_replication_factor(config_file: Path) -> None: assert "the following arguments are required: --replication-factor" in exc_info.value.stderr.decode() +def test_exits_with_return_code_3_for_data_restoration_error( + config_file: Path, + admin_client: KafkaAdminClient, +) -> None: + topic_name = "a-topic" + location = ( + Path(__file__).parent.parent.resolve() + / "test_data" + / "backup_v3_corrupt_last_record_bit_flipped_no_checkpoints" + / f"{topic_name}.metadata" + ) + + # Make sure topic doesn't exist beforehand. + try: + admin_client.delete_topics([topic_name]) + except UnknownTopicOrPartitionError: + print("No previously existing topic.") + else: + print("Deleted topic from previous run.") + + admin_client.create_topics([NewTopic(topic_name, 1, 1)]) + with pytest.raises(subprocess.CalledProcessError) as er: + subprocess.run( + [ + "karapace_schema_backup", + "restore", + "--config", + str(config_file), + "--topic", + topic_name, + "--location", + str(location), + "--skip-topic-creation", + ], + capture_output=True, + check=True, + ) + assert er.value.returncode == 3 + + def test_roundtrip_from_file( tmp_path: Path, config_file: Path, @@ -571,7 +611,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback): with patch("karapace.backup.api._producer") as p: p.return_value = LowTimeoutProducer() - with pytest.raises(KafkaTimeoutError): + with pytest.raises(BackupDataRestorationError): api.restore_backup( config=config, backup_location=metadata_path,