Skip to content

Commit

Permalink
Merge branch 'main' into fix/return-compatible-when-schema-not-there
Browse files Browse the repository at this point in the history
  • Loading branch information
Enis Mustafaj authored and Enis Mustafaj committed Nov 13, 2024
2 parents c252a24 + 1ea3ad5 commit e1dddac
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 6 deletions.
12 changes: 10 additions & 2 deletions src/karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,13 +373,20 @@ def _handle_restore_topic(
instruction: RestoreTopic,
config: Config,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
if skip_topic_creation:
return
repl_factor = instruction.replication_factor
if override_replication_factor is not None:
LOG.info(
"Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor
)
repl_factor = override_replication_factor
if not _maybe_create_topic(
config=config,
name=instruction.topic_name,
replication_factor=instruction.replication_factor,
replication_factor=repl_factor,
topic_configs=instruction.topic_configs,
):
raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists")
Expand Down Expand Up @@ -426,6 +433,7 @@ def restore_backup(
backup_location: ExistingFile,
topic_name: TopicName,
skip_topic_creation: bool = False,
override_replication_factor: int | None = None,
) -> None:
"""Restores a backup from the specified location into the configured topic.
Expand Down Expand Up @@ -475,7 +483,7 @@ def _check_producer_exception() -> None:
_handle_restore_topic_legacy(instruction, config, skip_topic_creation)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, RestoreTopic):
_handle_restore_topic(instruction, config, skip_topic_creation)
_handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor)
producer = stack.enter_context(_producer(config, instruction.topic_name))
elif isinstance(instruction, ProducerSend):
if producer is None:
Expand Down
10 changes: 10 additions & 0 deletions src/karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace:
),
)

parser_restore.add_argument(
"--override-replication-factor",
help=(
"Override the replication factor that is save in the backup. This is needed when restoring a backup from a"
"downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups."
),
type=int,
)

return parser.parse_args()


Expand Down Expand Up @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None:
backup_location=api.locate_backup_file(location),
topic_name=api.normalize_topic_name(args.topic, config),
skip_topic_creation=args.skip_topic_creation,
override_replication_factor=args.override_replication_factor,
)
except BackupDataRestorationError:
traceback.print_exc()
Expand Down
20 changes: 17 additions & 3 deletions src/karapace/protobuf/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,17 +93,31 @@ def _deserialize_msg(msgtype: Any) -> MessageElement:
for nested_enum in msgtype.enum_type:
nested_types.append(_deserialize_enum(nested_enum))

one_ofs: list[OneOfElement] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl]
one_ofs: list[OneOfElement | None] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl]

for f in msgtype.field:
sf = _deserialize_field(f)
if f.HasField("oneof_index"):
is_oneof = f.HasField("oneof_index")
is_proto3_optional = f.HasField("oneof_index") and f.HasField("proto3_optional") and f.proto3_optional
if is_proto3_optional:
# Every proto3 optional field is placed into a one-field oneof, called a "synthetic" oneof,
# as it was not present in the source .proto file.
# This will make sure that we don't interpret those optionals as oneof.
one_ofs[f.oneof_index] = None
fields.append(sf)
elif is_oneof:
one_ofs[f.oneof_index].fields.append(sf)
else:
fields.append(sf)

one_ofs_filtered: list[OneOfElement] = [oneof for oneof in one_ofs if oneof is not None]
return MessageElement(
DEFAULT_LOCATION, msgtype.name, nested_types=nested_types, reserveds=reserveds, fields=fields, one_ofs=one_ofs
DEFAULT_LOCATION,
msgtype.name,
nested_types=nested_types,
reserveds=reserveds,
fields=fields,
one_ofs=one_ofs_filtered,
)


Expand Down
2 changes: 2 additions & 0 deletions src/karapace/sentry/sentry_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ def _initialize_sentry(self) -> None:
# Don't send library logged errors to Sentry as there is also proper return value or raised exception to calling code
from sentry_sdk.integrations.logging import ignore_logger

ignore_logger("aiokafka")
ignore_logger("aiokafka.*")
ignore_logger("kafka")
ignore_logger("kafka.*")

Expand Down
52 changes: 51 additions & 1 deletion tests/integration/backup/test_v3_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"""
from __future__ import annotations

from aiokafka.errors import UnknownTopicOrPartitionError
from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError
from collections.abc import Iterator
from confluent_kafka import Message, TopicPartition
from confluent_kafka.admin import NewTopic
Expand Down Expand Up @@ -698,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback):
)


def test_backup_restoration_override_replication_factor(
admin_client: KafkaAdminClient,
kafka_servers: KafkaServers,
producer: KafkaProducer,
new_topic: NewTopic,
) -> None:
backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic
metadata_path = backup_directory / f"{new_topic.topic}.metadata"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)

# pupulate the topic and create a backup
for i in range(10):
producer.send(
new_topic.topic,
key=f"message-key-{i}",
value=f"message-value-{i}-" + 1000 * "X",
)
producer.flush()
api.create_backup(
config=config,
backup_location=backup_directory,
topic_name=TopicName(new_topic.topic),
version=BackupVersion.V3,
replication_factor=6,
)

# make sure topic doesn't exist beforehand.
_delete_topic(admin_client, new_topic.topic)

# assert that the restore would fail without the replication factor override
with pytest.raises(InvalidReplicationFactorError):
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
)

# finally restore the backup with override
api.restore_backup(
config=config,
backup_location=metadata_path,
topic_name=TopicName(new_topic.topic),
override_replication_factor=1,
)


def no_color_env() -> dict[str, str]:
env = os.environ.copy()
try:
Expand Down
18 changes: 18 additions & 0 deletions tests/schemas/protobuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,3 +261,21 @@
"lzdGVyLk1ldGFkYXRhEhYKDmNvbXBhbnlfbnVtYmVyGAIgASgJGhYKCE1ldGFkYXRhEgoK"
"AmlkGAEgASgJYgZwcm90bzM="
)

schema_protobuf_optionals_bin = (
"Cgp0ZXN0LnByb3RvIqYBCgpEaW1lbnNpb25zEhEKBHNpemUYASABKAFIAIgBARISCgV3aWR0aBgCIAEoAUgBiAEBEhMKBmhlaWdodBgDIAEo"
+ "AUgCiAEBEhMKBmxlbmd0aBgEIAEoAUgDiAEBEhMKBndlaWdodBgFIAEoAUgEiAEBQgcKBV9zaXplQggKBl93aWR0aEIJCgdfaGVpZ2h0Qg"
+ "kKB19sZW5ndGhCCQoHX3dlaWdodGIGcHJvdG8z"
)

schema_protobuf_optionals = """\
syntax = "proto3";
message Dimensions {
optional double size = 1;
optional double width = 2;
optional double height = 3;
optional double length = 4;
optional double weight = 5;
}
"""
4 changes: 4 additions & 0 deletions tests/unit/test_protobuf_binary_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
schema_protobuf_nested_message4_bin_protoc,
schema_protobuf_oneof,
schema_protobuf_oneof_bin,
schema_protobuf_optionals,
schema_protobuf_optionals_bin,
schema_protobuf_order_after,
schema_protobuf_order_after_bin,
schema_protobuf_plain,
Expand Down Expand Up @@ -89,6 +91,7 @@
(schema_protobuf_references, schema_protobuf_references_bin),
(schema_protobuf_references2, schema_protobuf_references2_bin),
(schema_protobuf_complex, schema_protobuf_complex_bin),
(schema_protobuf_optionals, schema_protobuf_optionals_bin),
],
)
def test_schema_deserialize(schema_plain, schema_serialized):
Expand Down Expand Up @@ -125,6 +128,7 @@ def test_protoc_serialized_schema_deserialize(schema_plain, schema_serialized):
schema_protobuf_references,
schema_protobuf_references2,
schema_protobuf_complex,
schema_protobuf_optionals,
],
)
def test_simple_schema_serialize(schema):
Expand Down

0 comments on commit e1dddac

Please sign in to comment.