Skip to content

Commit

Permalink
feature: Return specific exit code for failed data restoration
Browse files Browse the repository at this point in the history
`karapace_schema_backup restore` returns exit code 3 if an error
was raised while restoring data. This is helpful to be able to distinguish
between a generic error and a data restoration error, allowing clients
to correctly retry the restoration command.
  • Loading branch information
giuseppelillo committed Jun 14, 2023
1 parent addddff commit 460603e
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 23 deletions.
48 changes: 35 additions & 13 deletions karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,14 @@
from .backends.v3.schema import ChecksumAlgorithm
from .backends.writer import BackupWriter, StdOut
from .encoders import encode_key, encode_value
from .errors import BackupError, BackupTopicAlreadyExists, EmptyPartition, PartitionCountError, StaleConsumerError
from .errors import (
BackupDataRestorationError,
BackupError,
BackupTopicAlreadyExists,
EmptyPartition,
PartitionCountError,
StaleConsumerError,
)
from .poll_timeout import PollTimeout
from .topic_configurations import ConfigSource, get_topic_configurations
from enum import Enum
Expand All @@ -20,6 +27,7 @@
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaError, TopicAlreadyExistsError
from kafka.future import Future
from kafka.structs import PartitionMetadata, TopicPartition
from karapace import constants
from karapace.backup.backends.v1 import SchemaBackupV1Reader
Expand Down Expand Up @@ -255,6 +263,16 @@ def _consumer(config: Config, topic: str) -> Iterator[KafkaConsumer]:
yield consumer


@contextlib.contextmanager
def _enable_producer_callback_errors() -> Iterator[None]:
global_value = Future.error_on_callbacks
Future.error_on_callbacks = True
try:
yield None
finally:
Future.error_on_callbacks = global_value


@contextlib.contextmanager
def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]:
"""Creates an automatically closing Kafka producer client.
Expand All @@ -264,9 +282,10 @@ def _producer(config: Config, topic: str) -> Iterator[KafkaProducer]:
:raises PartitionCountError: if the topic does not have exactly one partition.
:raises Exception: if client creation fails, concrete exception types are unknown, see Kafka implementation.
"""
with kafka_producer_from_config(config) as producer:
__check_partition_count(topic, producer.partitions_for)
yield producer
with _enable_producer_callback_errors():
with kafka_producer_from_config(config) as producer:
__check_partition_count(topic, producer.partitions_for)
yield producer


def _normalize_location(input_location: str) -> Path | StdOut:
Expand Down Expand Up @@ -372,7 +391,7 @@ def _handle_restore_topic(


def _raise_backup_error(exception: Exception) -> NoReturn:
raise BackupError("Error while producing restored messages") from exception
raise BackupDataRestorationError("Error while producing restored messages") from exception


def _handle_producer_send(
Expand All @@ -384,14 +403,17 @@ def _handle_producer_send(
instruction.key,
instruction.value,
)
producer.send(
instruction.topic_name,
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp_ms=instruction.timestamp,
).add_errback(_raise_backup_error)
try:
producer.send(
instruction.topic_name,
key=instruction.key,
value=instruction.value,
partition=instruction.partition_index,
headers=[(key.decode() if key is not None else None, value) for key, value in instruction.headers],
timestamp_ms=instruction.timestamp,
).add_errback(_raise_backup_error)
except (KafkaError, AssertionError) as ex:
raise BackupDataRestorationError("Error while calling send on restoring messages") from ex


def restore_backup(
Expand Down
19 changes: 12 additions & 7 deletions karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
from __future__ import annotations

from . import api
from .errors import StaleConsumerError
from .errors import BackupDataRestorationError, StaleConsumerError
from .poll_timeout import PollTimeout
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config

import argparse
import sys
import traceback


def parse_args() -> argparse.Namespace:
Expand Down Expand Up @@ -95,12 +96,16 @@ def dispatch(args: argparse.Namespace) -> None:
api.verify(api.locate_backup_file(location), level=VerifyLevel(args.level))
elif args.command == "restore":
config = get_config(args)
api.restore_backup(
config=config,
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
)
try:
api.restore_backup(
config=config,
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
)
except BackupDataRestorationError:
traceback.print_exc()
sys.exit(3)
elif args.command == "export-anonymized-avro-schemas":
config = get_config(args)
api.create_backup(
Expand Down
4 changes: 4 additions & 0 deletions karapace/backup/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ class BackupTopicAlreadyExists(BackupError):
pass


class BackupDataRestorationError(BackupError):
pass


class StaleConsumerError(BackupError, RuntimeError):
"""Raised when the backup consumer does not make any progress and has not reached the last record in the topic."""

Expand Down
46 changes: 43 additions & 3 deletions tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@
from kafka import KafkaAdminClient, KafkaProducer, TopicPartition
from kafka.admin import ConfigResource, ConfigResourceType, NewTopic
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import KafkaTimeoutError, UnknownTopicOrPartitionError
from kafka.errors import UnknownTopicOrPartitionError
from karapace.backup import api
from karapace.backup.api import _consume_records, TopicName
from karapace.backup.backends.v3.readers import read_metadata
from karapace.backup.backends.v3.schema import Metadata
from karapace.backup.errors import EmptyPartition
from karapace.backup.errors import BackupDataRestorationError, EmptyPartition
from karapace.backup.poll_timeout import PollTimeout
from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations
from karapace.config import Config, set_config_defaults
Expand Down Expand Up @@ -334,6 +334,46 @@ def test_errors_when_omitting_replication_factor(config_file: Path) -> None:
assert "the following arguments are required: --replication-factor" in exc_info.value.stderr.decode()


def test_exits_with_return_code_3_for_data_restoration_error(
config_file: Path,
admin_client: KafkaAdminClient,
) -> None:
topic_name = "a-topic"
location = (
Path(__file__).parent.parent.resolve()
/ "test_data"
/ "backup_v3_corrupt_last_record_bit_flipped_no_checkpoints"
/ f"{topic_name}.metadata"
)

# Make sure topic doesn't exist beforehand.
try:
admin_client.delete_topics([topic_name])
except UnknownTopicOrPartitionError:
print("No previously existing topic.")
else:
print("Deleted topic from previous run.")

admin_client.create_topics([NewTopic(topic_name, 1, 1)])
with pytest.raises(subprocess.CalledProcessError) as er:
subprocess.run(
[
"karapace_schema_backup",
"restore",
"--config",
str(config_file),
"--topic",
topic_name,
"--location",
str(location),
"--skip-topic-creation",
],
capture_output=True,
check=True,
)
assert er.value.returncode == 3


def test_roundtrip_from_file(
tmp_path: Path,
config_file: Path,
Expand Down Expand Up @@ -571,7 +611,7 @@ def __exit__(self, exc_type, exc_value, exc_traceback):

with patch("karapace.backup.api._producer") as p:
p.return_value = LowTimeoutProducer()
with pytest.raises(KafkaTimeoutError):
with pytest.raises(BackupDataRestorationError):
api.restore_backup(
config=config,
backup_location=metadata_path,
Expand Down

0 comments on commit 460603e

Please sign in to comment.