Skip to content

Commit

Permalink
Merge pull request #988 from Aiven-Open/fdorlandi-hh-3944-invalide-re…
Browse files Browse the repository at this point in the history
…plication-factor

backup: add command line flag to override replication factor in restore command
  • Loading branch information
keejon authored Nov 5, 2024
2 parents e6f39bf + 6336082 commit ba66344
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 3 deletions.
12 changes: 10 additions & 2 deletions src/karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,20 @@ def _handle_restore_topic(
instruction: RestoreTopic,
config: Config,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
if skip_topic_creation:
return
repl_factor = instruction.replication_factor
if override_replication_factor is not None:
LOG.info(
"Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor
)
repl_factor = override_replication_factor
if not _maybe_create_topic(
config=config,
name=instruction.topic_name,
replication_factor=instruction.replication_factor,
replication_factor=repl_factor,
topic_configs=instruction.topic_configs,
):
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
Expand Down Expand Up @@ -426,6 +433,7 @@ def restore_backup(
backup_location: ExistingFile,
topic_name: TopicName,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
"""Restores a backup from the specified location into the configured topic.
Expand Down Expand Up @@ -475,7 +483,7 @@ def _check_producer_exception() -> None:
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, RestoreTopic):
_handle_restore_topic(instruction, config, skip_topic_creation)
_handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, ProducerSend):
if producer is None:
Expand Down
10 changes: 10 additions & 0 deletions src/karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace:
),
)

parser_restore.add_argument(
"--override-replication-factor",
help=(
"Override the replication factor that is save in the backup. This is needed when restoring a backup from a"
"downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups."
),
type=int,
)

return parser.parse_args()


Expand Down Expand Up @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None:
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
override_replication_factor=args.override_replication_factor,
)
except BackupDataRestorationError:
traceback.print_exc()
Expand Down
52 changes: 51 additions & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from aiokafka.errors import UnknownTopicOrPartitionError
from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError
from collections.abc import Iterator
from confluent_kafka import Message, TopicPartition
from confluent_kafka.admin import NewTopic
Expand Down Expand Up @@ -698,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_override_replication_factor(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
producer: KafkaProducer,
new_topic: NewTopic,
) -> None:
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic
metadata_path = backup_directory / f"{new_topic.topic}.metadata"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

# pupulate the topic and create a backup
for i in range(10):
producer.send(
new_topic.topic,
key=f"message-key-{i}",
value=f"message-value-{i}-" + 1000 * "X",
)
producer.flush()
api.create_backup(
config=config,
backup_location=backup_directory,
topic_name=TopicName(new_topic.topic),
version=BackupVersion.V3,
replication_factor=6,
)

# make sure topic doesn't exist beforehand.
_delete_topic(admin_client, new_topic.topic)

# assert that the restore would fail without the replication factor override
with pytest.raises(InvalidReplicationFactorError):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
)

# finally restore the backup with override
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
override_replication_factor=1,
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down

0 comments on commit ba66344

Please sign in to comment.