From 4f30e209b936e500d476a6c23d048769f7bebdef Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Fri, 26 May 2023 17:26:08 +0200 Subject: [PATCH] feature: Add topic configurations to v3 format - added `topic_configurations` and `replication_factor` to Metadata schema - backed up configurations include all the custom topic configs and a set of default configurations - topic creation when restoring a backup with format v3 will fail if the topic already exists --- karapace/avro_dataclasses/introspect.py | 24 +++- karapace/avro_dataclasses/schema.py | 10 +- karapace/backup/api.py | 104 ++++++++++++------ karapace/backup/backends/reader.py | 18 ++- .../backup/backends/v3/avro/Metadata.avsc | 11 ++ karapace/backup/backends/v3/backend.py | 10 +- karapace/backup/backends/v3/schema.py | 4 +- karapace/backup/backends/writer.py | 4 +- karapace/backup/cli.py | 7 ++ karapace/backup/errors.py | 6 +- karapace/backup/topic_configurations.py | 64 +++++++++++ karapace/client.py | 3 +- .../backup/test_get_topic_configurations.py | 86 +++++++++++++++ tests/integration/backup/test_v3_backup.py | 59 +++++++--- .../a-topic.metadata | Bin 95 -> 139 bytes .../a-topic:123.data | Bin 88 -> 106 bytes .../a5f7a413.metadata | Bin 92 -> 148 bytes .../a5f7a413:0.data | Bin 112 -> 107 bytes .../0cdc85dc.metadata | Bin 0 -> 203 bytes .../{6595c9c2:0.data => 0cdc85dc:0.data} | Bin .../6595c9c2.metadata | Bin 92 -> 0 bytes tests/unit/backup/backends/test_v1.py | 8 +- tests/unit/backup/backends/test_v2.py | 12 +- tests/unit/backup/backends/v3/test_backend.py | 19 +++- .../a-topic.metadata | Bin 93 -> 139 bytes .../a-topic:123.data | Bin 65 -> 107 bytes .../backup/backends/v3/test_serialisation.py | 2 + tests/unit/backup/test_api.py | 36 ++++-- 28 files changed, 404 insertions(+), 83 deletions(-) create mode 100644 karapace/backup/topic_configurations.py create mode 100644 tests/integration/backup/test_get_topic_configurations.py create mode 100644 tests/integration/test_data/backup_v3_single_partition/0cdc85dc.metadata rename tests/integration/test_data/backup_v3_single_partition/{6595c9c2:0.data => 0cdc85dc:0.data} (100%) delete mode 100644 tests/integration/test_data/backup_v3_single_partition/6595c9c2.metadata diff --git a/karapace/avro_dataclasses/introspect.py b/karapace/avro_dataclasses/introspect.py index cac01d7e3..01f07d4a5 100644 --- a/karapace/avro_dataclasses/introspect.py +++ b/karapace/avro_dataclasses/introspect.py @@ -6,6 +6,7 @@ from __future__ import annotations from .schema import AvroType, FieldSchema, RecordSchema +from collections.abc import Mapping from dataclasses import Field, fields, is_dataclass, MISSING from enum import Enum from functools import lru_cache @@ -30,7 +31,7 @@ class UnsupportedAnnotation(NotImplementedError): ... -class UnderspecifiedArray(UnsupportedAnnotation): +class UnderspecifiedAnnotation(UnsupportedAnnotation): ... @@ -93,7 +94,7 @@ def _field_type(field: Field, type_: object) -> AvroType: # pylint: disable=too if origin in sequence_types: return _field_type_array(field, origin, type_) if type_ in sequence_types: - raise UnderspecifiedArray("Inner type must be specified for sequence types") + raise UnderspecifiedAnnotation("Inner type must be specified for sequence types") # Handle enums. if isinstance(type_, type) and issubclass(type_, Enum): @@ -107,6 +108,25 @@ def _field_type(field: Field, type_: object) -> AvroType: # pylint: disable=too } ) + # Handle map types. + if origin is Mapping: + args = get_args(type_) + if len(args) != 2: + raise UnderspecifiedAnnotation("Key and value types must be specified for map types") + if args[0] is not str: + raise UnsupportedAnnotation("Key type must be str") + return FieldSchema( + { + "type": "map", + "values": _field_type(field, args[1]), + **( + {"default": field.default_factory()} + if field.default_factory is not MISSING + else {} # type: ignore[misc] + ), + } + ) + raise NotImplementedError( f"Found an unknown type {type_!r} while assembling Avro schema for the field " f"{field.name!r}. The Avro dataclasses implementation likely needs to be " diff --git a/karapace/avro_dataclasses/schema.py b/karapace/avro_dataclasses/schema.py index 3cc42fdee..fdb524303 100644 --- a/karapace/avro_dataclasses/schema.py +++ b/karapace/avro_dataclasses/schema.py @@ -4,6 +4,7 @@ """ from __future__ import annotations +from collections.abc import Mapping from typing import Literal from typing_extensions import NotRequired, TypeAlias, TypedDict @@ -29,9 +30,16 @@ class EnumType(TypedDict): default: NotRequired[str] +class MapType(TypedDict): + name: str + type: Literal["map"] + values: AvroType + default: NotRequired[Mapping[str, AvroType]] + + TypeUnit: TypeAlias = "Primitive | TypeObject" UnionType: TypeAlias = "list[TypeUnit]" -AvroType: TypeAlias = "TypeUnit | UnionType | RecordSchema | ArrayType | EnumType" +AvroType: TypeAlias = "TypeUnit | UnionType | RecordSchema | ArrayType | EnumType | MapType" class FieldSchema(TypedDict): diff --git a/karapace/backup/api.py b/karapace/backup/api.py index 25ae6ee6e..4280efd9a 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -6,13 +6,14 @@ """ from __future__ import annotations -from .backends.reader import BaseBackupReader, BaseItemsBackupReader, ProducerSend, RestoreTopic +from .backends.reader import BaseBackupReader, BaseItemsBackupReader, ProducerSend, RestoreTopic, RestoreTopicLegacy from .backends.v3.constants import V3_MARKER from .backends.v3.schema import ChecksumAlgorithm from .backends.writer import BackupWriter, StdOut from .encoders import encode_key, encode_value -from .errors import BackupError, EmptyPartition, PartitionCountError, StaleConsumerError +from .errors import BackupError, BackupTopicAlreadyExists, EmptyPartition, PartitionCountError, StaleConsumerError from .poll_timeout import PollTimeout +from .topic_configurations import ConfigSource, get_topic_configurations from enum import Enum from functools import partial from kafka import KafkaConsumer, KafkaProducer @@ -27,12 +28,11 @@ from karapace.config import Config from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config from karapace.key_format import KeyFormatter -from karapace.schema_reader import new_schema_topic_from_config from karapace.utils import assert_never from pathlib import Path from rich.console import Console from tenacity import retry, retry_if_exception_type, RetryCallState, stop_after_delay, wait_fixed -from typing import AbstractSet, Callable, Collection, Iterator, Literal, NewType, NoReturn, TypeVar +from typing import AbstractSet, Callable, Collection, Iterator, Literal, Mapping, NewType, NoReturn, TypeVar import contextlib import datetime @@ -178,31 +178,27 @@ def _admin(config: Config) -> KafkaAdminClient: wait=wait_fixed(1), # seconds retry=retry_if_exception_type(KafkaError), ) -def _maybe_create_topic(config: Config, name: str, backup_version: BackupVersion) -> None: - if backup_version in {BackupVersion.V1, BackupVersion.V2}: - topic = new_schema_topic_from_config(config) - - if topic.name != name: - LOG.warning( - "Not creating topic, because the name %r from the config and the name %r from the CLI differ.", - topic.name, - name, - ) - return - else: - topic = NewTopic( - name=name, - num_partitions=1, - replication_factor=config["replication_factor"], - topic_configs={"cleanup.policy": "compact"}, - ) +def _maybe_create_topic( + name: str, + *, + config: Config, + replication_factor: int, + topic_configs: Mapping[str, str], +) -> bool: + """Returns True if topic creation was successful, False if topic already exists""" + topic = NewTopic( + name=name, + num_partitions=constants.SCHEMA_TOPIC_NUM_PARTITIONS, + replication_factor=replication_factor, + topic_configs=topic_configs, + ) with _admin(config) as admin: try: admin.create_topics([topic], timeout_ms=constants.TOPIC_CREATION_TIMEOUT_MS) except TopicAlreadyExistsError: LOG.debug("Topic %r already exists", topic.name) - return + return False LOG.info( "Created topic %r (partition count: %s, replication factor: %s, config: %s)", @@ -211,7 +207,7 @@ def _maybe_create_topic(config: Config, name: str, backup_version: BackupVersion topic.replication_factor, topic.topic_configs, ) - return + return True @contextlib.contextmanager @@ -307,18 +303,38 @@ def _write_partition( ) -def _handle_restore_topic( - instruction: RestoreTopic, +def _handle_restore_topic_legacy( + instruction: RestoreTopicLegacy, config: Config, - backup_version: BackupVersion, ) -> None: + if config["topic_name"] != instruction.topic_name: + LOG.warning( + "Not creating topic, because the name %r from the config and the name %r from the CLI differ.", + config["topic_name"], + instruction.topic_name, + ) + return _maybe_create_topic( config=config, - name=instruction.name, - backup_version=backup_version, + name=instruction.topic_name, + replication_factor=config["replication_factor"], + topic_configs={"cleanup.policy": "compact"}, ) +def _handle_restore_topic( + instruction: RestoreTopic, + config: Config, +) -> None: + if not _maybe_create_topic( + config=config, + name=instruction.topic_name, + replication_factor=instruction.replication_factor, + topic_configs=instruction.topic_configs, + ): + raise BackupTopicAlreadyExists(f"Topic to restore '{instruction.topic_name}' already exists") + + def _raise_backup_error(exception: Exception) -> NoReturn: raise BackupError("Error while producing restored messages") from exception @@ -347,6 +363,12 @@ def restore_backup( backup_location: Path | StdOut, topic_name: TopicName, ) -> None: + """Restores a backup from the specified location into the configured topic. + + :raises Exception: if production fails, concrete exception types are unknown, + see Kafka implementation. + :raises BackupTopicAlreadyExists: if backup version is V3 and topic already exists + """ if isinstance(backup_location, str): raise NotImplementedError("Cannot restore backups from stdin") @@ -377,9 +399,12 @@ def restore_backup( producer = None for instruction in backend.read(backup_location, topic_name): - if isinstance(instruction, RestoreTopic): - _handle_restore_topic(instruction, config, backup_version=backup_version) - producer = stack.enter_context(_producer(config, instruction.name)) + if isinstance(instruction, RestoreTopicLegacy): + _handle_restore_topic_legacy(instruction, config) + producer = stack.enter_context(_producer(config, instruction.topic_name)) + elif isinstance(instruction, RestoreTopic): + _handle_restore_topic(instruction, config) + producer = stack.enter_context(_producer(config, instruction.topic_name)) elif isinstance(instruction, ProducerSend): if producer is None: raise RuntimeError("Backend has not yet sent RestoreTopic.") @@ -396,6 +421,7 @@ def create_backup( *, poll_timeout: PollTimeout = PollTimeout.default(), overwrite: bool = False, + replication_factor: int | None = None, ) -> None: """Creates a backup of the configured topic. @@ -404,6 +430,9 @@ def create_backup( if not records are received within that time and the target offset has not been reached an exception is raised. Defaults to one minute. :param overwrite: the output file if it exists. + :param replication_factor: if specified, metadata will contain this value for + replication factor, instead of the one that would be retrieved from + the current topic state. :raises Exception: if consumption fails, concrete exception types are unknown, see Kafka implementation. @@ -416,6 +445,8 @@ def create_backup( """ if version is BackupVersion.V3 and not isinstance(backup_location, Path): raise RuntimeError("Backup format version 3 does not support writing to stdout.") + if version is BackupVersion.V3 and replication_factor is None: + raise RuntimeError("Backup format version 3 needs a replication factor to be specified.") start_time = datetime.datetime.now(datetime.timezone.utc) backend = version.writer() @@ -426,6 +457,10 @@ def create_backup( version.name, topic_name, ) + with _admin(config) as admin: + topic_configurations = get_topic_configurations( + admin=admin, topic_name=topic_name, config_source_filter={ConfigSource.TOPIC_CONFIG} + ) # Note: It's expected that we at some point want to introduce handling of # multi-partition topics here. The backend interface is built with that in @@ -464,6 +499,8 @@ def create_backup( started_at=start_time, finished_at=end_time, partition_count=1, + replication_factor=replication_factor if replication_factor is not None else config["replication_factor"], + topic_configurations=topic_configurations, data_files=[data_file] if data_file else [], ) @@ -506,6 +543,9 @@ def inspect(backup_location: Path | StdOut) -> None: "topic_name": metadata.topic_name, "topic_id": None if metadata.topic_id is None else str(metadata.topic_id), "partition_count": metadata.partition_count, + "record_count": metadata.record_count, + "replication_factor": metadata.replication_factor, + "topic_configurations": metadata.topic_configurations, "checksum_algorithm": metadata.checksum_algorithm.value, "data_files": tuple( { diff --git a/karapace/backup/backends/reader.py b/karapace/backup/backends/reader.py index 02a7d9fa4..d4caadda2 100644 --- a/karapace/backup/backends/reader.py +++ b/karapace/backup/backends/reader.py @@ -7,7 +7,7 @@ from karapace.dataclasses import default_dataclass from karapace.typing import JsonData, JsonObject from pathlib import Path -from typing import Callable, ClassVar, Final, Generator, IO, Iterator, Optional, Sequence, TypeVar, Union +from typing import Callable, ClassVar, Final, Generator, IO, Iterator, Mapping, Optional, Sequence, TypeVar, Union from typing_extensions import TypeAlias import abc @@ -17,10 +17,18 @@ PARTITION_ZERO: Final = 0 +@default_dataclass +class RestoreTopicLegacy: + topic_name: str + partition_count: int + + @default_dataclass class RestoreTopic: - name: str + topic_name: str partition_count: int + replication_factor: int + topic_configs: Mapping[str, str] @default_dataclass @@ -33,7 +41,7 @@ class ProducerSend: timestamp: int | None = None -Instruction: TypeAlias = "RestoreTopic | ProducerSend" +Instruction: TypeAlias = "RestoreTopicLegacy | RestoreTopic | ProducerSend" KeyEncoder: TypeAlias = Callable[[Union[JsonObject, str]], Optional[bytes]] @@ -78,8 +86,8 @@ def read( path: Path, topic_name: str, ) -> Generator[Instruction, None, None]: - yield RestoreTopic( - name=topic_name, + yield RestoreTopicLegacy( + topic_name=topic_name, partition_count=1, ) with path.open("r") as buffer: diff --git a/karapace/backup/backends/v3/avro/Metadata.avsc b/karapace/backup/backends/v3/avro/Metadata.avsc index 02e4c4bd3..718f32cde 100644 --- a/karapace/backup/backends/v3/avro/Metadata.avsc +++ b/karapace/backup/backends/v3/avro/Metadata.avsc @@ -48,6 +48,17 @@ "name": "partition_count", "type": "int" }, + { + "name": "replication_factor", + "type": "int" + }, + { + "name": "topic_configurations", + "type": { + "type": "map", + "values": "string" + } + }, { "name": "data_files", "type": { diff --git a/karapace/backup/backends/v3/backend.py b/karapace/backup/backends/v3/backend.py index 8564ec054..827eb28f0 100644 --- a/karapace/backup/backends/v3/backend.py +++ b/karapace/backup/backends/v3/backend.py @@ -18,7 +18,7 @@ from karapace.utils import assert_never from karapace.version import __version__ from pathlib import Path -from typing import Callable, ContextManager, Final, Generator, IO, Iterator, Sequence, TypeVar +from typing import Callable, ContextManager, Final, Generator, IO, Iterator, Mapping, Sequence, TypeVar from typing_extensions import TypeAlias import datetime @@ -127,8 +127,10 @@ def read(self, path: Path, topic_name: str) -> Iterator[Instruction]: raise UnknownChecksumAlgorithm("Tried restoring from a backup with an unknown checksum algorithm.") yield RestoreTopic( - name=topic_name, + topic_name=topic_name, partition_count=metadata.partition_count, + replication_factor=metadata.replication_factor, + topic_configs=metadata.topic_configurations, ) for data_file in metadata.data_files: @@ -297,6 +299,8 @@ def store_metadata( started_at: datetime.datetime, finished_at: datetime.datetime, partition_count: int, + replication_factor: int, + topic_configurations: Mapping[str, str], data_files: Sequence[DataFile], ) -> None: assert isinstance(path, Path) @@ -321,6 +325,8 @@ def store_metadata( topic_name=topic_name, topic_id=topic_id, partition_count=partition_count, + replication_factor=replication_factor, + topic_configurations=topic_configurations, checksum_algorithm=ChecksumAlgorithm.xxhash3_64_be, data_files=tuple(data_files), ), diff --git a/karapace/backup/backends/v3/schema.py b/karapace/backup/backends/v3/schema.py index 68f7cadf9..3e42ad648 100644 --- a/karapace/backup/backends/v3/schema.py +++ b/karapace/backup/backends/v3/schema.py @@ -7,7 +7,7 @@ from dataclasses import field from karapace.avro_dataclasses.models import AvroModel from karapace.dataclasses import default_dataclass -from typing import Optional, Tuple +from typing import Mapping, Optional, Tuple import datetime import enum @@ -54,6 +54,8 @@ class Metadata(AvroModel): topic_name: str topic_id: Optional[uuid.UUID] partition_count: int = field(metadata={"type": "int"}) + replication_factor: int = field(metadata={"type": "int"}) + topic_configurations: Mapping[str, str] data_files: Tuple[DataFile, ...] checksum_algorithm: ChecksumAlgorithm = ChecksumAlgorithm.unknown diff --git a/karapace/backup/backends/writer.py b/karapace/backup/backends/writer.py index 6b083b52b..052773876 100644 --- a/karapace/backup/backends/writer.py +++ b/karapace/backup/backends/writer.py @@ -7,7 +7,7 @@ from kafka.consumer.fetcher import ConsumerRecord from karapace.backup.safe_writer import bytes_writer, str_writer from pathlib import Path -from typing import ContextManager, Generic, IO, Iterator, Literal, Sequence, TypeVar +from typing import ContextManager, Generic, IO, Iterator, Literal, Mapping, Sequence, TypeVar from typing_extensions import TypeAlias import abc @@ -82,6 +82,8 @@ def store_metadata( started_at: datetime.datetime, finished_at: datetime.datetime, partition_count: int, + replication_factor: int, + topic_configurations: Mapping[str, str], data_files: Sequence[F], ) -> None: """ diff --git a/karapace/backup/cli.py b/karapace/backup/cli.py index 6506ead9d..d893e8330 100644 --- a/karapace/backup/cli.py +++ b/karapace/backup/cli.py @@ -40,6 +40,12 @@ def parse_args() -> argparse.Namespace: p.add_argument("--poll-timeout", help=PollTimeout.__doc__, type=PollTimeout, default=PollTimeout.default()) parser_get.add_argument("--use-format-v3", action="store_true", help="Use experimental V3 backup format.") + parser_get.add_argument( + "--replication-factor", + help="Use a specified replication factor instead of retrieving it from the current topic state. " + "Required for V3 backup format.", + type=int, + ) parser_verify.add_argument( "--level", @@ -71,6 +77,7 @@ def dispatch(args: argparse.Namespace) -> None: version=api.BackupVersion.V3 if args.use_format_v3 else api.BackupVersion.V2, poll_timeout=args.poll_timeout, overwrite=args.overwrite, + replication_factor=args.replication_factor, ) elif args.command == "inspect": api.inspect(location) diff --git a/karapace/backup/errors.py b/karapace/backup/errors.py index f1f8e0e79..ec739d6ef 100644 --- a/karapace/backup/errors.py +++ b/karapace/backup/errors.py @@ -5,7 +5,7 @@ from kafka.structs import TopicPartition from karapace.backup.poll_timeout import PollTimeout -__all__ = ["BackupError", "PartitionCountError", "StaleConsumerError"] +__all__ = ["BackupError", "BackupTopicAlreadyExists", "EmptyPartition", "PartitionCountError", "StaleConsumerError"] class BackupError(Exception): @@ -20,6 +20,10 @@ class PartitionCountError(BackupError): pass +class BackupTopicAlreadyExists(BackupError): + pass + + class StaleConsumerError(BackupError, RuntimeError): """Raised when the backup consumer does not make any progress and has not reached the last record in the topic.""" diff --git a/karapace/backup/topic_configurations.py b/karapace/backup/topic_configurations.py new file mode 100644 index 000000000..43f507155 --- /dev/null +++ b/karapace/backup/topic_configurations.py @@ -0,0 +1,64 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from enum import Enum +from kafka import KafkaAdminClient +from kafka.admin import ConfigResource, ConfigResourceType +from kafka.errors import for_code +from kafka.protocol.admin import DescribeConfigsRequest +from typing import Container, Dict, Final + + +class ConfigSource(int, Enum): + UNKNOWN = 0 + TOPIC_CONFIG = 1 + DYNAMIC_BROKER_CONFIG = 2 + DYNAMIC_DEFAULT_BROKER_CONFIG = 3 + STATIC_BROKER_CONFIG = 4 + DEFAULT_CONFIG = 5 + DYNAMIC_BROKER_LOGGER_CONFIG = 6 + + +ALL_CONFIG_SOURCES: Final = {item.value for item in ConfigSource.__members__.values()} + + +DEFAULT_CONFIGS: Final = [ + "cleanup.policy", + "min.insync.replicas", + "retention.bytes", + "retention.ms", +] + + +def get_topic_configurations( + admin: KafkaAdminClient, + topic_name: str, + config_source_filter: Container[ConfigSource] = (), +) -> Dict[str, str]: + """Get configurations of the specified topic. The following configurations will be retrieved by default: + - `cleanup.policy` + - `min.insync.replicas` + - `retention.bytes` + - `retention.ms` + + :param admin: Kafka admin client + :param topic_name: get configurations for this topic + :param config_source_filter: returns all the configurations that match the sources specified, + plus the default configurations. If empty, returns only the default configurations. + """ + if admin._matching_api_version(DescribeConfigsRequest) == 0: # pylint: disable=protected-access + raise NotImplementedError("Broker version is not supported") + req_cfgs = [ConfigResource(ConfigResourceType.TOPIC, topic_name)] + cfgs = admin.describe_configs(req_cfgs) + assert len(cfgs) == 1 + assert len(cfgs[0].resources) == 1 + err, _, _, _, config_values = cfgs[0].resources[0] + if err != 0: + raise for_code(err) + topic_config = {} + for cv in config_values: + name, val, _, config_source, _, _ = cv + if name in DEFAULT_CONFIGS or (config_source in config_source_filter): + topic_config[name] = val + return topic_config diff --git a/karapace/client.py b/karapace/client.py index 6706c8db4..3bd97518b 100644 --- a/karapace/client.py +++ b/karapace/client.py @@ -5,9 +5,8 @@ See LICENSE for details """ from aiohttp import BasicAuth, ClientSession -from collections.abc import Mapping from karapace.typing import JsonData -from typing import Awaitable, Callable, Optional, Union +from typing import Awaitable, Callable, Mapping, Optional, Union from urllib.parse import urljoin import logging diff --git a/tests/integration/backup/test_get_topic_configurations.py b/tests/integration/backup/test_get_topic_configurations.py new file mode 100644 index 000000000..49ef6251f --- /dev/null +++ b/tests/integration/backup/test_get_topic_configurations.py @@ -0,0 +1,86 @@ +""" +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from kafka import KafkaAdminClient +from kafka.admin import ConfigResource, ConfigResourceType, NewTopic +from karapace.backup.topic_configurations import ALL_CONFIG_SOURCES, ConfigSource, DEFAULT_CONFIGS, get_topic_configurations +from karapace.constants import TOPIC_CREATION_TIMEOUT_MS +from typing import Dict + +import pytest +import secrets + + +@pytest.fixture(scope="function", name="new_topic") +def topic_fixture(admin_client: KafkaAdminClient) -> NewTopic: + new_topic = NewTopic(secrets.token_hex(4), 1, 1) + admin_client.create_topics([new_topic], timeout_ms=TOPIC_CREATION_TIMEOUT_MS) + try: + yield new_topic + finally: + admin_client.delete_topics([new_topic.name], timeout_ms=TOPIC_CREATION_TIMEOUT_MS) + + +class TestTopicConfiguration: + @pytest.mark.parametrize("custom_topic_configs", [{}, {"max.message.bytes": "1234"}]) + def test_get_custom_topic_configurations( + self, + new_topic: NewTopic, + admin_client: KafkaAdminClient, + custom_topic_configs: Dict[str, str], + ) -> None: + admin_client.alter_configs([ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs=custom_topic_configs)]) + + retrieved_configs = get_topic_configurations( + admin_client, new_topic.name, config_source_filter={ConfigSource.TOPIC_CONFIG} + ) + + # Verify that default configs are retrieved, and then remove them + for default_config in DEFAULT_CONFIGS: + assert default_config in retrieved_configs + del retrieved_configs[default_config] + + # Verify that all custom topic configs are correctly retrieved and no other config is present + assert retrieved_configs == custom_topic_configs + + def test_get_only_default_topic_configurations( + self, + new_topic: NewTopic, + admin_client: KafkaAdminClient, + ) -> None: + custom_topic_configs = {"segment.bytes": "7890"} + admin_client.alter_configs([ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs=custom_topic_configs)]) + + retrieved_configs = get_topic_configurations(admin_client, new_topic.name, config_source_filter=()) + + # Verify that default configs are retrieved, and then remove them + for default_config in DEFAULT_CONFIGS: + assert default_config in retrieved_configs + del retrieved_configs[default_config] + + # Verify that only default configs are retrieved + assert retrieved_configs == {} + + def test_get_all_topic_configurations( + self, + new_topic: NewTopic, + admin_client: KafkaAdminClient, + ) -> None: + custom_topic_configs = {"flush.ms": "999"} + admin_client.alter_configs([ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs=custom_topic_configs)]) + + retrieved_configs = get_topic_configurations(admin_client, new_topic.name, config_source_filter=ALL_CONFIG_SOURCES) + + # Verify that default configs are retrieved, and then remove them + for default_config in DEFAULT_CONFIGS: + assert default_config in retrieved_configs + del retrieved_configs[default_config] + + # Verify that all custom topic configs are correctly retrieved, and then remove them + for custom_config_key, custom_config_value in custom_topic_configs.items(): + assert retrieved_configs[custom_config_key] == custom_config_value + del retrieved_configs[custom_config_key] + + # Verify that also other configs are retrieved + assert retrieved_configs diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index 4e06868a3..bd4f87ae2 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -6,13 +6,14 @@ from dataclasses import fields from kafka import KafkaAdminClient, KafkaProducer, TopicPartition -from kafka.admin import NewTopic +from kafka.admin import ConfigResource, ConfigResourceType, NewTopic from kafka.consumer.fetcher import ConsumerRecord from kafka.errors import UnknownTopicOrPartitionError from karapace.backup.api import _consume_records from karapace.backup.backends.v3.readers import read_metadata from karapace.backup.backends.v3.schema import Metadata from karapace.backup.poll_timeout import PollTimeout +from karapace.backup.topic_configurations import ConfigSource, get_topic_configurations from karapace.config import Config, set_config_defaults from karapace.constants import TOPIC_CREATION_TIMEOUT_MS from karapace.kafka_utils import kafka_admin_from_config, kafka_consumer_from_config, kafka_producer_from_config @@ -107,6 +108,9 @@ def test_roundtrip_from_kafka_state( karapace_config: Config, ) -> None: # Populate the test topic. + admin_client.alter_configs( + [ConfigResource(ConfigResourceType.TOPIC, new_topic.name, configs={"max.message.bytes": "999"})] + ) producer.send( new_topic.name, key=b"bar", @@ -127,6 +131,8 @@ def test_roundtrip_from_kafka_state( ).add_errback(_raise) producer.flush() + topic_config = get_topic_configurations(admin_client, new_topic.name, {ConfigSource.TOPIC_CONFIG}) + # Execute backup creation. subprocess.run( [ @@ -137,6 +143,7 @@ def test_roundtrip_from_kafka_state( str(config_file), "--topic", new_topic.name, + "--replication-factor=1", "--location", str(tmp_path), ], @@ -183,6 +190,8 @@ def test_roundtrip_from_kafka_state( poll_timeout=PollTimeout.default(), ) + assert topic_config == get_topic_configurations(admin_client, new_topic.name, {ConfigSource.TOPIC_CONFIG}) + # First record. assert isinstance(first_record, ConsumerRecord) assert first_record.topic == new_topic.name @@ -216,7 +225,7 @@ def test_roundtrip_from_file( config_file: Path, admin_client: KafkaAdminClient, ) -> None: - topic_name = "6595c9c2" + topic_name = "0cdc85dc" backup_directory = Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" metadata_path = backup_directory / f"{topic_name}.metadata" with metadata_path.open("rb") as buffer: @@ -258,6 +267,7 @@ def test_roundtrip_from_file( str(config_file), "--topic", topic_name, + "--replication-factor=1", "--location", str(tmp_path), ], @@ -288,6 +298,9 @@ def test_roundtrip_from_file( # Verify new version matches current version of Karapace. assert new_metadata.tool_version == __version__ + # Verify replication factor is correctly propagated. + assert new_metadata.replication_factor == 1 + # Verify all fields other than timings and version match exactly. for field in fields(Metadata): if field.name in {"started_at", "finished_at", "tool_version"}: @@ -311,7 +324,7 @@ def no_color_env() -> dict[str, str]: class TestInspect: def test_can_inspect_v3(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "6595c9c2.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "0cdc85dc.metadata" ) cp = subprocess.run( @@ -331,16 +344,25 @@ def test_can_inspect_v3(self) -> None: assert json.loads(cp.stdout) == { "version": 3, "tool_name": "karapace", - "tool_version": "3.4.6-67-g26d38c0", - "started_at": "2023-05-12T14:24:45.932000+00:00", - "finished_at": "2023-05-12T14:24:46.286000+00:00", - "topic_name": "6595c9c2", + "tool_version": "3.5.0-31-g15440ce", + "started_at": "2023-05-31T11:42:21.116000+00:00", + "finished_at": "2023-05-31T11:42:21.762000+00:00", + "topic_name": "0cdc85dc", "topic_id": None, "partition_count": 1, + "record_count": 2, + "replication_factor": 1, + "topic_configurations": { + "min.insync.replicas": "1", + "cleanup.policy": "delete", + "retention.ms": "604800000", + "max.message.bytes": "999", + "retention.bytes": "-1", + }, "checksum_algorithm": "xxhash3_64_be", "data_files": [ { - "filename": "6595c9c2:0.data", + "filename": "0cdc85dc:0.data", "partition": 0, "checksum_hex": "f414f504a8e49313", "record_count": 2, @@ -374,18 +396,21 @@ def test_can_inspect_v3_with_future_checksum_algorithm(self) -> None: assert json.loads(cp.stdout) == { "version": 3, "tool_name": "karapace", - "tool_version": "3.4.6-67-g26d38c0", - "started_at": "2023-05-30T14:44:24.841000+00:00", - "finished_at": "2023-05-30T14:44:25.168000+00:00", + "tool_version": "3.5.0-27-g9535c1d", + "started_at": "2023-05-31T12:01:01.165000+00:00", + "finished_at": "2023-05-31T12:01:01.165000+00:00", "topic_name": "a5f7a413", "topic_id": None, "partition_count": 1, + "record_count": 2, + "replication_factor": 2, + "topic_configurations": {"cleanup.policy": "compact", "min.insync.replicas": "2"}, "checksum_algorithm": "unknown", "data_files": [ { "filename": "a5f7a413:0.data", "partition": 0, - "checksum_hex": "f414f504a8e49313", + "checksum_hex": "66343134663530346138653439333133", "record_count": 2, "start_offset": 0, "end_offset": 1, @@ -435,7 +460,7 @@ def test_can_inspect_v1(self) -> None: class TestVerify: def test_can_verify_file_integrity(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "6595c9c2.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "0cdc85dc.metadata" ) cp = subprocess.run( @@ -454,14 +479,14 @@ def test_can_verify_file_integrity(self) -> None: assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( """\ - Integrity of 6595c9c2:0.data is intact. + Integrity of 0cdc85dc:0.data is intact. ✅ Verified 1 data files in backup OK. """ ) def test_can_verify_record_integrity(self) -> None: metadata_path = ( - Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "6595c9c2.metadata" + Path(__file__).parent.parent.resolve() / "test_data" / "backup_v3_single_partition" / "0cdc85dc.metadata" ) cp = subprocess.run( @@ -480,7 +505,7 @@ def test_can_verify_record_integrity(self) -> None: assert cp.stderr == b"" assert cp.stdout.decode() == textwrap.dedent( """\ - Integrity of 6595c9c2:0.data is intact. + Integrity of 0cdc85dc:0.data is intact. ✅ Verified 1 data files in backup OK. """ ) @@ -510,6 +535,7 @@ def test_can_verify_file_integrity_from_large_topic( "--use-format-v3", f"--config={config_file!s}", f"--topic={new_topic.name!s}", + "--replication-factor=1", f"--location={tmp_path!s}", ], capture_output=True, @@ -563,6 +589,7 @@ def test_can_verify_record_integrity_from_large_topic( "--use-format-v3", f"--config={config_file!s}", f"--topic={new_topic.name}", + "--replication-factor=1", f"--location={tmp_path!s}", ], capture_output=True, diff --git a/tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/a-topic.metadata b/tests/integration/test_data/backup_v3_corrupt_last_record_bit_flipped_no_checkpoints/a-topic.metadata index 78ce5da9d41d84468ea8097cab938d2f62a8d9e6..64478fe46fcc99874551bd6c730cf9190f39cd76 100644 GIT binary patch literal 139 zcmdN7Gv;DoU}$C&$WAOuEJ#dFRWjBy)icmFGS^MFG&MF&HcYv(`P;0%BnZjGm#ABk zUyzy1#KgiPlbn;9m{(e$SCF5RnOw=2oSzFcp+qe=GfyuwuedTVS+6Ly04SAM%w)vC dqyW`wWoTrqmy%eL_>GY#F)10yV*)yX2>@0pEJFYQ literal 95 zcmdN7Gv;DoUfj3^x1OQeaCnf*@ literal 88 zcmZQzV9;P-OUut^U`t9YViG9M&rQ|MPOTIHGs+TkN>drWF`iqp;W!Ty0}*O}fYtIc F0RUg>8`b~- diff --git a/tests/integration/test_data/backup_v3_future_algorithm/a5f7a413.metadata b/tests/integration/test_data/backup_v3_future_algorithm/a5f7a413.metadata index 847db9c510e63c4d4117625989e839de43bea500..a18557815e2e32c5e1b225d19b2e55deef5aea50 100644 GIT binary patch literal 148 zcmdN7Gv;DoVCZ2J$WAOuEJ#dFRWjBy)icmFGS^MFG&MF&HcYv-`P;0%BnZhOkZ77_ zo@iod%*4dPB9okxnwVEwpjVKelbKw}mztQ6O$Z_ZDpXBl30?+An--xE6a)}lZ9Crm>5_9!?|oCyGhuqYk? literal 112 zcmZQzU=U(pOG+$aU`xx-XJA;gc;8VTCI$wEFo*z1ibbqAKQ~o3BQ-H4wMYrXR)DY- d$`W%*Qzi0CGE$46(t2QNun3AG2Bt$`lK~H^Bl`dV diff --git a/tests/integration/test_data/backup_v3_single_partition/0cdc85dc.metadata b/tests/integration/test_data/backup_v3_single_partition/0cdc85dc.metadata new file mode 100644 index 0000000000000000000000000000000000000000..24f2ed642e300ff1ff084207ca07ec4279af5f8d GIT binary patch literal 203 zcmdN7Gv;DoU^vVskeyhRSdf^Us${HZs%M~UY^a-VXli0&kevGC+S{3ZNi7dSB#VGS za!Rs=X-YB^6BCzOZf2fdW?pe+Ub0?MYC%qBa$+%)p-gg4YGPh#fnGsAP^6M4B{e6t zBvqmaNaU4d=I80<77Ljfm{=HqfKqN^g None: second_send, ) = tuple(reader.read(tmp_file, "some-topic")) - assert restore_topic == RestoreTopic(name="some-topic", partition_count=1) + assert restore_topic == RestoreTopicLegacy(topic_name="some-topic", partition_count=1) # First message. assert isinstance(first_send, ProducerSend) @@ -68,4 +68,6 @@ def test_yields_single_restore_topic_for_null(self, tmp_file: Path) -> None: reader = get_reader() tmp_file.write_text("null") - assert tuple(reader.read(tmp_file, "some-topic")) == (RestoreTopic(name="some-topic", partition_count=1),) + assert tuple(reader.read(tmp_file, "some-topic")) == ( + RestoreTopicLegacy(topic_name="some-topic", partition_count=1), + ) diff --git a/tests/unit/backup/backends/test_v2.py b/tests/unit/backup/backends/test_v2.py index 5fe9a2ebb..b38f83469 100644 --- a/tests/unit/backup/backends/test_v2.py +++ b/tests/unit/backup/backends/test_v2.py @@ -6,7 +6,7 @@ from functools import partial from kafka.consumer.fetcher import ConsumerRecord -from karapace.backup.backends.reader import ProducerSend, RestoreTopic +from karapace.backup.backends.reader import ProducerSend, RestoreTopicLegacy from karapace.backup.backends.v2 import AnonymizeAvroWriter, SchemaBackupV2Reader, SchemaBackupV2Writer from karapace.backup.encoders import encode_key, encode_value from karapace.key_format import KeyFormatter @@ -109,6 +109,8 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: topic_id=None, started_at=datetime.datetime.now(datetime.timezone.utc), finished_at=datetime.datetime.now(datetime.timezone.utc), + replication_factor=1, + topic_configurations={}, data_files=(data_file,), partition_count=1, ) @@ -120,7 +122,7 @@ def test_schema_backup_v2_roundtrip(tmp_path: Path) -> None: second_send, ) = reader.read(backup_path, topic_name) - assert restore_topic == RestoreTopic(name=topic_name, partition_count=1) + assert restore_topic == RestoreTopicLegacy(topic_name=topic_name, partition_count=1) # First message. assert isinstance(first_send, ProducerSend) @@ -258,6 +260,8 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: topic_id=None, started_at=datetime.datetime.now(datetime.timezone.utc), finished_at=datetime.datetime.now(datetime.timezone.utc), + replication_factor=1, + topic_configurations={}, data_files=(data_file,), partition_count=1, ) @@ -273,7 +277,7 @@ def test_anonymize_avro_roundtrip(tmp_path: Path) -> None: second_send, ) = reader.read(backup_path, topic_name) - assert restore_topic == RestoreTopic(name=topic_name, partition_count=1) + assert restore_topic == RestoreTopicLegacy(topic_name=topic_name, partition_count=1) # First message. assert isinstance(first_send, ProducerSend) @@ -326,4 +330,4 @@ def test_yields_restore_topic_for_empty_file(tmp_file: Path) -> None: reader = get_reader() tmp_file.write_text("/V2\n") instructions = tuple(reader.read(tmp_file, "a-topic")) - assert instructions == (RestoreTopic(name="a-topic", partition_count=1),) + assert instructions == (RestoreTopicLegacy(topic_name="a-topic", partition_count=1),) diff --git a/tests/unit/backup/backends/v3/test_backend.py b/tests/unit/backup/backends/v3/test_backend.py index 084407ef1..4f8562311 100644 --- a/tests/unit/backup/backends/v3/test_backend.py +++ b/tests/unit/backup/backends/v3/test_backend.py @@ -62,6 +62,7 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: serialized_header_size=None, ), ) + topic_configurations = {"max.message.bytes": "1024"} # Write backup to files. backup_writer = SchemaBackupV3Writer( @@ -87,6 +88,8 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: topic_id=None, started_at=started_at, finished_at=finished_at, + replication_factor=2, + topic_configurations=topic_configurations, data_files=(data_file,), partition_count=1, ) @@ -113,8 +116,10 @@ def test_writer_reader_roundtrip(tmp_path: Path) -> None: assert instructions == ( RestoreTopic( - name=topic_name, + topic_name=topic_name, partition_count=1, + replication_factor=2, + topic_configs={"max.message.bytes": "1024"}, ), ProducerSend( topic_name=topic_name, @@ -215,6 +220,8 @@ def test_reader_raises_invalid_checksum(tmp_path: Path) -> None: topic_id=None, started_at=started_at, finished_at=finished_at, + replication_factor=1, + topic_configurations={}, data_files=(data_file,), partition_count=1, ) @@ -273,6 +280,8 @@ def test_reader_raises_invalid_checksum_for_corruption_in_last_record(tmp_path: topic_id=None, started_at=started_at, finished_at=finished_at, + replication_factor=1, + topic_configurations={}, data_files=(data_file,), partition_count=1, ) @@ -327,6 +336,8 @@ def test_reader_raises_too_many_records(tmp_path: Path) -> None: topic_id=None, started_at=started_at, finished_at=finished_at, + replication_factor=1, + topic_configurations={}, data_files=(data_file,), partition_count=1, ) @@ -381,6 +392,8 @@ def test_reader_raises_too_few_records(tmp_path: Path) -> None: topic_id=None, started_at=started_at, finished_at=finished_at, + replication_factor=1, + topic_configurations={}, data_files=(data_file,), partition_count=1, ) @@ -434,6 +447,8 @@ def test_reader_raises_offset_mismatch_for_first_record(tmp_path: Path) -> None: finished_at=finished_at, data_files=(data_file,), partition_count=1, + topic_configurations={}, + replication_factor=1, ) # Read backup into restore instructions. @@ -485,6 +500,8 @@ def test_reader_raises_offset_mismatch_for_last_record(tmp_path: Path) -> None: finished_at=finished_at, data_files=(data_file,), partition_count=1, + topic_configurations={}, + replication_factor=1, ) # Read backup into restore instructions. diff --git a/tests/unit/backup/backends/v3/test_data/backup_v3_future_algorithm/a-topic.metadata b/tests/unit/backup/backends/v3/test_data/backup_v3_future_algorithm/a-topic.metadata index e1298b50616ee271dceb252df210ae88740ea1d6..19547bccb96a550b8e5a60efa905a49147e45205 100644 GIT binary patch literal 139 zcmdN7Gv;DoU}$C&$WAOuEJ#dFRWjBy)icmFGS^MFG&MF&HcYv)`P;0%BnZjGm#ABk zUyzy1#KgiPlbn;9m{(e$SCF5RnOw=2oSzFcp+qe=GfyuwuedTVS+6Ly04SAM%w)vC eqyW`wWoTrqmy%eL_>GY#F)10yV`5-pU;qG94JBnZjEm#ABk nUyzy1#KfckWm*{;8SAAamLz^-6j!?|oCyGhuqYk? literal 65 zcmZQzU=U(pOUut^U`t9YVi38rYw2+wCI$uub%ZFBKyiL= diff --git a/tests/unit/backup/backends/v3/test_serialisation.py b/tests/unit/backup/backends/v3/test_serialisation.py index 4dfb1f4ee..fe8872911 100644 --- a/tests/unit/backup/backends/v3/test_serialisation.py +++ b/tests/unit/backup/backends/v3/test_serialisation.py @@ -181,6 +181,8 @@ def test_metadata_roundtrip(self, buffer: IO[bytes]) -> None: topic_name="some-topic", topic_id=uuid.uuid4(), partition_count=1, + replication_factor=2, + topic_configurations={"cleanup.policy": "compact", "min.insync.replicas": "2"}, data_files=( DataFile( filename="some-topic:0.data", diff --git a/tests/unit/backup/test_api.py b/tests/unit/backup/test_api.py index 56e5c9b82..e82e50e19 100644 --- a/tests/unit/backup/test_api.py +++ b/tests/unit/backup/test_api.py @@ -9,7 +9,15 @@ from kafka.errors import KafkaError, TopicAlreadyExistsError from kafka.structs import PartitionMetadata from karapace import config -from karapace.backup.api import _admin, _consumer, _maybe_create_topic, _producer, BackupVersion +from karapace.backup.api import ( + _admin, + _consumer, + _handle_restore_topic, + _handle_restore_topic_legacy, + _maybe_create_topic, + _producer, +) +from karapace.backup.backends.reader import RestoreTopic, RestoreTopicLegacy from karapace.backup.errors import PartitionCountError from karapace.config import Config from karapace.constants import DEFAULT_SCHEMA_TOPIC, TOPIC_CREATION_TIMEOUT_MS @@ -59,11 +67,12 @@ def test_reraises_unknown_exceptions( assert sleep_mock.call_count == 0 # proof that we did not retry -class TestMaybeCreateTopic: +class TestHandleRestoreTopic: @patch_admin_new def test_calls_admin_create_topics(self, admin_new: MagicMock) -> None: create_topics: MagicMock = admin_new.return_value.create_topics - _maybe_create_topic(config.DEFAULTS, DEFAULT_SCHEMA_TOPIC, BackupVersion.V1) + topic_configs = {"cleanup.policy": "compact"} + _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs=topic_configs) create_topics.assert_called_once_with(mock.ANY, timeout_ms=TOPIC_CREATION_TIMEOUT_MS) ((new_topic,),) = create_topics.call_args.args @@ -71,13 +80,13 @@ def test_calls_admin_create_topics(self, admin_new: MagicMock) -> None: assert new_topic.name == DEFAULT_SCHEMA_TOPIC assert new_topic.num_partitions == 1 assert new_topic.replication_factor == config.DEFAULTS["replication_factor"] - assert new_topic.topic_configs == {"cleanup.policy": "compact"} + assert new_topic.topic_configs == topic_configs @patch_admin_new def test_gracefully_handles_topic_already_exists_error(self, admin_new: MagicMock) -> None: create_topics: MagicMock = admin_new.return_value.create_topics create_topics.side_effect = TopicAlreadyExistsError() - _maybe_create_topic(config.DEFAULTS, DEFAULT_SCHEMA_TOPIC, BackupVersion.V2) + _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) create_topics.assert_called_once() @patch_admin_new @@ -86,20 +95,19 @@ def test_retries_for_kafka_errors(self, admin_new: MagicMock) -> None: create_topics.side_effect = [KafkaError("1"), KafkaError("2"), None] with mock.patch("time.sleep", autospec=True): - _maybe_create_topic(config.DEFAULTS, DEFAULT_SCHEMA_TOPIC, BackupVersion.V2) + _maybe_create_topic(DEFAULT_SCHEMA_TOPIC, config=config.DEFAULTS, replication_factor=1, topic_configs={}) assert create_topics.call_count == 3 - @pytest.mark.parametrize("version", (BackupVersion.V1, BackupVersion.V2)) @patch_admin_new def test_noop_for_custom_name_on_legacy_versions( self, admin_new: MagicMock, - version: BackupVersion, ) -> None: create_topics: MagicMock = admin_new.return_value.create_topics assert "custom-name" != DEFAULT_SCHEMA_TOPIC - _maybe_create_topic(config.DEFAULTS, "custom-name", version) + instruction = RestoreTopicLegacy(topic_name="custom-name", partition_count=1) + _handle_restore_topic_legacy(instruction, config.DEFAULTS) create_topics.assert_not_called() @patch_admin_new @@ -110,15 +118,19 @@ def test_allows_custom_name_on_v3( create_topics: MagicMock = admin_new.return_value.create_topics topic_name = "custom-name" assert topic_name != DEFAULT_SCHEMA_TOPIC - _maybe_create_topic(config.DEFAULTS, "custom-name", BackupVersion.V3) + topic_configs = {"segment.bytes": "1000"} + instruction = RestoreTopic( + topic_name="custom-name", partition_count=1, replication_factor=2, topic_configs=topic_configs + ) + _handle_restore_topic(instruction, config.DEFAULTS) create_topics.assert_called_once_with(mock.ANY, timeout_ms=TOPIC_CREATION_TIMEOUT_MS) ((new_topic,),) = create_topics.call_args.args assert isinstance(new_topic, NewTopic) assert new_topic.name == topic_name assert new_topic.num_partitions == 1 - assert new_topic.replication_factor == config.DEFAULTS["replication_factor"] - assert new_topic.topic_configs == {"cleanup.policy": "compact"} + assert new_topic.replication_factor == 2 + assert new_topic.topic_configs == topic_configs class TestClients: