From 7a3c8395358a34ff80210db8eaa7e95eb13e5dbb Mon Sep 17 00:00:00 2001 From: Jonas Keeling Date: Wed, 30 Oct 2024 16:07:31 +0100 Subject: [PATCH 1/3] fix: proto3 optionals from base64 encoded proto descriptors This fixes deserialization of optionals in proto3 descriptors as they are represented by synthetic oneofs. --- src/karapace/protobuf/serialization.py | 20 ++++++++++++++++--- tests/schemas/protobuf.py | 18 +++++++++++++++++ .../test_protobuf_binary_serialization.py | 4 ++++ 3 files changed, 39 insertions(+), 3 deletions(-) diff --git a/src/karapace/protobuf/serialization.py b/src/karapace/protobuf/serialization.py index 6c3ca61fd..123e80c8f 100644 --- a/src/karapace/protobuf/serialization.py +++ b/src/karapace/protobuf/serialization.py @@ -93,17 +93,31 @@ def _deserialize_msg(msgtype: Any) -> MessageElement: for nested_enum in msgtype.enum_type: nested_types.append(_deserialize_enum(nested_enum)) - one_ofs: list[OneOfElement] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl] + one_ofs: list[OneOfElement | None] = [OneOfElement(oneof.name) for oneof in msgtype.oneof_decl] for f in msgtype.field: sf = _deserialize_field(f) - if f.HasField("oneof_index"): + is_oneof = f.HasField("oneof_index") + is_proto3_optional = f.HasField("oneof_index") and f.HasField("proto3_optional") and f.proto3_optional + if is_proto3_optional: + # Every proto3 optional field is placed into a one-field oneof, called a "synthetic" oneof, + # as it was not present in the source .proto file. + # This will make sure that we don't interpret those optionals as oneof. + one_ofs[f.oneof_index] = None + fields.append(sf) + elif is_oneof: one_ofs[f.oneof_index].fields.append(sf) else: fields.append(sf) + one_ofs_filtered: list[OneOfElement] = [oneof for oneof in one_ofs if oneof is not None] return MessageElement( - DEFAULT_LOCATION, msgtype.name, nested_types=nested_types, reserveds=reserveds, fields=fields, one_ofs=one_ofs + DEFAULT_LOCATION, + msgtype.name, + nested_types=nested_types, + reserveds=reserveds, + fields=fields, + one_ofs=one_ofs_filtered, ) diff --git a/tests/schemas/protobuf.py b/tests/schemas/protobuf.py index 1bd3f05d5..afbb3f890 100644 --- a/tests/schemas/protobuf.py +++ b/tests/schemas/protobuf.py @@ -261,3 +261,21 @@ "lzdGVyLk1ldGFkYXRhEhYKDmNvbXBhbnlfbnVtYmVyGAIgASgJGhYKCE1ldGFkYXRhEgoK" "AmlkGAEgASgJYgZwcm90bzM=" ) + +schema_protobuf_optionals_bin = ( + "Cgp0ZXN0LnByb3RvIqYBCgpEaW1lbnNpb25zEhEKBHNpemUYASABKAFIAIgBARISCgV3aWR0aBgCIAEoAUgBiAEBEhMKBmhlaWdodBgDIAEo" + + "AUgCiAEBEhMKBmxlbmd0aBgEIAEoAUgDiAEBEhMKBndlaWdodBgFIAEoAUgEiAEBQgcKBV9zaXplQggKBl93aWR0aEIJCgdfaGVpZ2h0Qg" + + "kKB19sZW5ndGhCCQoHX3dlaWdodGIGcHJvdG8z" +) + +schema_protobuf_optionals = """\ +syntax = "proto3"; + +message Dimensions { + optional double size = 1; + optional double width = 2; + optional double height = 3; + optional double length = 4; + optional double weight = 5; +} +""" diff --git a/tests/unit/test_protobuf_binary_serialization.py b/tests/unit/test_protobuf_binary_serialization.py index 6950066d3..99bfe375e 100644 --- a/tests/unit/test_protobuf_binary_serialization.py +++ b/tests/unit/test_protobuf_binary_serialization.py @@ -16,6 +16,8 @@ schema_protobuf_nested_message4_bin_protoc, schema_protobuf_oneof, schema_protobuf_oneof_bin, + schema_protobuf_optionals, + schema_protobuf_optionals_bin, schema_protobuf_order_after, schema_protobuf_order_after_bin, schema_protobuf_plain, @@ -89,6 +91,7 @@ (schema_protobuf_references, schema_protobuf_references_bin), (schema_protobuf_references2, schema_protobuf_references2_bin), (schema_protobuf_complex, schema_protobuf_complex_bin), + (schema_protobuf_optionals, schema_protobuf_optionals_bin), ], ) def test_schema_deserialize(schema_plain, schema_serialized): @@ -125,6 +128,7 @@ def test_protoc_serialized_schema_deserialize(schema_plain, schema_serialized): schema_protobuf_references, schema_protobuf_references2, schema_protobuf_complex, + schema_protobuf_optionals, ], ) def test_simple_schema_serialize(schema): From 6336082170c1045c3c997d53266cab781fc7df7d Mon Sep 17 00:00:00 2001 From: Francesco D'Orlandi Date: Thu, 31 Oct 2024 18:46:52 +0100 Subject: [PATCH 2/3] backup: add command line flag to override replication factor in restore command --- src/karapace/backup/api.py | 12 ++++- src/karapace/backup/cli.py | 10 +++++ tests/integration/backup/test_v3_backup.py | 52 +++++++++++++++++++++- 3 files changed, 71 insertions(+), 3 deletions(-) diff --git a/src/karapace/backup/api.py b/src/karapace/backup/api.py index d06c99ebe..3da9a2304 100644 --- a/src/karapace/backup/api.py +++ b/src/karapace/backup/api.py @@ -373,13 +373,20 @@ def _handle_restore_topic( instruction: RestoreTopic, config: Config, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: if skip_topic_creation: return + repl_factor = instruction.replication_factor + if override_replication_factor is not None: + LOG.info( + "Overriding replication factor with: %d (was: %d)", override_replication_factor, instruction.replication_factor + ) + repl_factor = override_replication_factor if not _maybe_create_topic( config=config, name=instruction.topic_name, - replication_factor=instruction.replication_factor, + replication_factor=repl_factor, topic_configs=instruction.topic_configs, ): raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists") @@ -426,6 +433,7 @@ def restore_backup( backup_location: ExistingFile, topic_name: TopicName, skip_topic_creation: bool = False, + override_replication_factor: int | None = None, ) -> None: """Restores a backup from the specified location into the configured topic. @@ -475,7 +483,7 @@ def _check_producer_exception() -> None: _handle_restore_topic_legacy(instruction, config, skip_topic_creation) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, RestoreTopic): - _handle_restore_topic(instruction, config, skip_topic_creation) + _handle_restore_topic(instruction, config, skip_topic_creation, override_replication_factor) producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, ProducerSend): if producer is None: diff --git a/src/karapace/backup/cli.py b/src/karapace/backup/cli.py index 5e3d72854..7125b1e04 100644 --- a/src/karapace/backup/cli.py +++ b/src/karapace/backup/cli.py @@ -76,6 +76,15 @@ def parse_args() -> argparse.Namespace: ), ) + parser_restore.add_argument( + "--override-replication-factor", + help=( + "Override the replication factor that is save in the backup. This is needed when restoring a backup from a" + "downsized cluster (like scaling down from 6 to 3 nodes). This has effect only for V3 backups." + ), + type=int, + ) + return parser.parse_args() @@ -115,6 +124,7 @@ def dispatch(args: argparse.Namespace) -> None: backup_location=api.locate_backup_file(location), topic_name=api.normalize_topic_name(args.topic, config), skip_topic_creation=args.skip_topic_creation, + override_replication_factor=args.override_replication_factor, ) except BackupDataRestorationError: traceback.print_exc() diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 332b09f0a..6f2e5df35 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -4,7 +4,7 @@ """ from __future__ import annotations -from aiokafka.errors import UnknownTopicOrPartitionError +from aiokafka.errors import InvalidReplicationFactorError, UnknownTopicOrPartitionError from collections.abc import Iterator from confluent_kafka import Message, TopicPartition from confluent_kafka.admin import NewTopic @@ -698,6 +698,56 @@ def __exit__(self, exc_type, exc_value, exc_traceback): ) +def test_backup_restoration_override_replication_factor( + admin_client: KafkaAdminClient, + kafka_servers: KafkaServers, + producer: KafkaProducer, + new_topic: NewTopic, +) -> None: + backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / new_topic.topic + metadata_path = backup_directory / f"{new_topic.topic}.metadata" + config = set_config_defaults( + { + "bootstrap_uri": kafka_servers.bootstrap_servers, + } + ) + + # pupulate the topic and create a backup + for i in range(10): + producer.send( + new_topic.topic, + key=f"message-key-{i}", + value=f"message-value-{i}-" + 1000 * "X", + ) + producer.flush() + api.create_backup( + config=config, + backup_location=backup_directory, + topic_name=TopicName(new_topic.topic), + version=BackupVersion.V3, + replication_factor=6, + ) + + # make sure topic doesn't exist beforehand. + _delete_topic(admin_client, new_topic.topic) + + # assert that the restore would fail without the replication factor override + with pytest.raises(InvalidReplicationFactorError): + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + ) + + # finally restore the backup with override + api.restore_backup( + config=config, + backup_location=metadata_path, + topic_name=TopicName(new_topic.topic), + override_replication_factor=1, + ) + + def no_color_env() -> dict[str, str]: env = os.environ.copy() try: From 0e8e831da788bd14e23bd0c9e87b2395f966aeb2 Mon Sep 17 00:00:00 2001 From: Kevin Michel Date: Thu, 7 Nov 2024 01:39:57 +0100 Subject: [PATCH 3/3] fix: don't send all aiokafka log.error to sentry These log events are numerous and costly. When these `log.error` events are related to real error, they cause exceptions which are still properly handled and recorded by Sentry, no need to also send individual events for all the `log.error` in addition to the exceptions. The same logic was already applied for the `kafka` library. --- src/karapace/sentry/sentry_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/karapace/sentry/sentry_client.py b/src/karapace/sentry/sentry_client.py index 776214c7f..c4dc99d33 100644 --- a/src/karapace/sentry/sentry_client.py +++ b/src/karapace/sentry/sentry_client.py @@ -41,6 +41,8 @@ def _initialize_sentry(self) -> None: # Don't send library logged errors to Sentry as there is also proper return value or raised exception to calling code from sentry_sdk.integrations.logging import ignore_logger + ignore_logger("aiokafka") + ignore_logger("aiokafka.*") ignore_logger("kafka") ignore_logger("kafka.*")