Skip to content

Commit

Permalink
fix: corrected backup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jjaakola-aiven committed Nov 19, 2024
1 parent 4b09f9e commit 60a8086
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 152 deletions.
14 changes: 6 additions & 8 deletions src/karapace/backup/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def normalize_topic_name(
topic_option: str | None,
config: Config,
) -> TopicName:
return TopicName(topic_option or config["topic_name"])
return TopicName(topic_option or config.topic_name)


class BackupVersion(Enum):
Expand Down Expand Up @@ -354,17 +354,17 @@ def _handle_restore_topic_legacy(
) -> None:
if skip_topic_creation:
return
if config["topic_name"] != instruction.topic_name:
if config.topic_name != instruction.topic_name:
LOG.warning(
"Not creating topic, because the name %r from the config and the name %r from the CLI differ.",
config["topic_name"],
config.topic_name,
instruction.topic_name,
)
return
_maybe_create_topic(
config=config,
name=instruction.topic_name,
replication_factor=config["replication_factor"],
replication_factor=config.replication_factor,
topic_configs={"cleanup.policy": "compact"},
)

Expand Down Expand Up @@ -441,9 +441,7 @@ def restore_backup(
see Kafka implementation.
:raises BackupTopicAlreadyExists: if backup version is V3 and topic already exists
"""
key_formatter = (
KeyFormatter() if topic_name == constants.DEFAULT_SCHEMA_TOPIC or config.get("force_key_correction", False) else None
)
key_formatter = KeyFormatter() if topic_name == constants.DEFAULT_SCHEMA_TOPIC or config.force_key_correction else None

backup_version = BackupVersion.identify(backup_location)
backend_type = backup_version.reader
Expand Down Expand Up @@ -591,7 +589,7 @@ def create_backup(
started_at=start_time,
finished_at=end_time,
partition_count=1,
replication_factor=replication_factor if replication_factor is not None else config["replication_factor"],
replication_factor=replication_factor if replication_factor is not None else config.replication_factor,
topic_configurations=topic_configurations,
data_files=[data_file] if data_file else [],
)
Expand Down
5 changes: 2 additions & 3 deletions src/karapace/backup/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from aiokafka.errors import BrokerResponseError
from collections.abc import Iterator
from karapace.backup.api import VerifyLevel
from karapace.config import Config, read_config
from karapace.config import Config, read_env_file

import argparse
import contextlib
Expand Down Expand Up @@ -89,8 +89,7 @@ def parse_args() -> argparse.Namespace:


def get_config(args: argparse.Namespace) -> Config:
with open(args.config) as buffer:
return read_config(buffer)
return read_env_file(args.config)


def dispatch(args: argparse.Namespace) -> None:
Expand Down
16 changes: 10 additions & 6 deletions src/karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class Config(BaseSettings):
consumer_request_max_bytes: int = 67108864
consumer_idle_disconnect_timeout: int = 0
fetch_min_bytes: int = 1
force_key_correction: bool = False
group_id: str = "schema-registry"
http_request_max_size: int | None = None
host: str = "127.0.0.1"
Expand Down Expand Up @@ -249,12 +250,15 @@ def write_env_file(dot_env_path: Path, config: Config) -> None:
dot_env_path.write_text(config.to_env_str())


# def read_config(config_handler: IO) -> Config:
# try:
# config = json_decode(config_handler)
# except JSONDecodeError as ex:
# raise InvalidConfiguration("Configuration is not a valid JSON") from ex
# return set_config_defaults(config)
def read_env_file(env_file_path: str) -> Config:
return Config(_env_file=env_file_path, _env_file_encoding="utf-8")

Config()
try:
config = json_decode(config_handler)
except JSONDecodeError as ex:
raise InvalidConfiguration("Configuration is not a valid JSON") from ex
return set_config_defaults(config)


def create_client_ssl_context(config: Config) -> ssl.SSLContext | None:
Expand Down
57 changes: 29 additions & 28 deletions src/karapace/kafka_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,35 @@

def kafka_admin_from_config(config: Config) -> KafkaAdminClient:
return KafkaAdminClient(
bootstrap_servers=config["bootstrap_uri"],
client_id=config["client_id"],
security_protocol=config["security_protocol"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
bootstrap_servers=config.bootstrap_uri,
client_id=config.client_id,
security_protocol=config.security_protocol,
sasl_mechanism=config.sasl_mechanism,
sasl_plain_username=config.sasl_plain_username,
sasl_plain_password=config.sasl_plain_password,
ssl_cafile=config.ssl_cafile,
ssl_certfile=config.ssl_certfile,
ssl_keyfile=config.ssl_keyfile,
)


@contextlib.contextmanager
def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaConsumer]:
consumer = KafkaConsumer(
bootstrap_servers=config["bootstrap_uri"],
bootstrap_servers=config.bootstrap_uri,
topic=topic,
enable_auto_commit=False,
client_id=config["client_id"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
client_id=config.client_id,
security_protocol=config.security_protocol,
ssl_cafile=config.ssl_cafile,
ssl_certfile=config.ssl_certfile,
ssl_keyfile=config.ssl_keyfile,
sasl_mechanism=config.sasl_mechanism,
sasl_plain_username=config.sasl_plain_username,
sasl_plain_password=config.sasl_plain_password,
auto_offset_reset="earliest",
session_timeout_ms=config["session_timeout_ms"],
metadata_max_age_ms=config["metadata_max_age_ms"],
session_timeout_ms=config.session_timeout_ms,
metadata_max_age_ms=config.metadata_max_age_ms,
)
try:
yield consumer
Expand All @@ -52,15 +52,16 @@ def kafka_consumer_from_config(config: Config, topic: str) -> Iterator[KafkaCons
@contextlib.contextmanager
def kafka_producer_from_config(config: Config) -> Iterator[KafkaProducer]:
producer = KafkaProducer(
bootstrap_servers=config["bootstrap_uri"],
security_protocol=config["security_protocol"],
ssl_cafile=config["ssl_cafile"],
ssl_certfile=config["ssl_certfile"],
ssl_keyfile=config["ssl_keyfile"],
sasl_mechanism=config["sasl_mechanism"],
sasl_plain_username=config["sasl_plain_username"],
sasl_plain_password=config["sasl_plain_password"],
bootstrap_servers=config.bootstrap_uri,
security_protocol=config.security_protocol,
ssl_cafile=config.ssl_cafile,
ssl_certfile=config.ssl_certfile,
ssl_keyfile=config.ssl_keyfile,
sasl_mechanism=config.sasl_mechanism,
sasl_plain_username=config.sasl_plain_username,
sasl_plain_password=config.sasl_plain_password,
retries=0,
session_timeout_ms=config.session_timeout_ms,
)
try:
yield producer
Expand Down
11 changes: 4 additions & 7 deletions tests/integration/backup/test_avro_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from karapace.backup import api
from karapace.backup.api import BackupVersion
from karapace.client import Client
from karapace.config import set_config_defaults
from karapace.config import Config
from karapace.utils import json_encode
from pathlib import Path
from tests.integration.utils.cluster import RegistryDescription
Expand Down Expand Up @@ -110,12 +110,9 @@ async def test_export_anonymized_avro_schemas(

# Get the backup
export_location = tmp_path / "export.log"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
"topic_name": registry_cluster.schemas_topic,
}
)
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.topic_name = registry_cluster.schemas_topic
api.create_backup(
config=config,
backup_location=export_location,
Expand Down
45 changes: 20 additions & 25 deletions tests/integration/backup/test_legacy_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from karapace.backup.errors import StaleConsumerError
from karapace.backup.poll_timeout import PollTimeout
from karapace.client import Client
from karapace.config import set_config_defaults
from karapace.config import Config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka.common import KafkaError
from karapace.kafka.consumer import KafkaConsumer
Expand Down Expand Up @@ -52,12 +52,10 @@ async def test_backup_get(

# Get the backup
backup_location = tmp_path / "schemas.log"
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
"topic_name": registry_cluster.schemas_topic,
}
)
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.topic_name = registry_cluster.schemas_topic

api.create_backup(
config=config,
backup_location=backup_location,
Expand Down Expand Up @@ -85,11 +83,9 @@ async def test_backup_restore_and_get_non_schema_topic(
) -> None:
test_topic_name = new_random_name("non-schemas")

config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
}
)
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]

admin_client.new_topic(name=test_topic_name)

# Restore from backup
Expand Down Expand Up @@ -154,13 +150,10 @@ async def test_backup_restore(
) -> None:
subject = "subject-1"
test_data_path = Path("tests/integration/test_data/")
config = set_config_defaults(
{
"bootstrap_uri": kafka_servers.bootstrap_servers,
"topic_name": registry_cluster.schemas_topic,
"force_key_correction": True,
}
)
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.topic_name = registry_cluster.schemas_topic
config.force_key_correction = True

# Test basic restore functionality
restore_location = test_data_path / f"test_restore_{backup_file_version}.log"
Expand Down Expand Up @@ -252,9 +245,10 @@ async def test_stale_consumer(
tmp_path: Path,
) -> None:
await insert_data(registry_async_client)
config = set_config_defaults(
{"bootstrap_uri": kafka_servers.bootstrap_servers, "topic_name": registry_cluster.schemas_topic}
)
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.topic_name = registry_cluster.schemas_topic

with pytest.raises(StaleConsumerError) as e:
# The proper way to test this would be with quotas by throttling our client to death while using a very short
# poll timeout. However, we have no way to set up quotas because all Kafka clients available to us do not
Expand All @@ -278,9 +272,10 @@ async def test_message_error(
tmp_path: Path,
) -> None:
await insert_data(registry_async_client)
config = set_config_defaults(
{"bootstrap_uri": kafka_servers.bootstrap_servers, "topic_name": registry_cluster.schemas_topic}
)
config = Config()
config.bootstrap_uri = kafka_servers.bootstrap_servers[0]
config.topic_name = registry_cluster.schemas_topic

with pytest.raises(InvalidTopicError):
with mock.patch(f"{KafkaConsumer.__module__}.{KafkaConsumer.__qualname__}.poll") as poll_mock:
poll_mock.return_value = StubMessage(error=KafkaError(KafkaError.TOPIC_EXCEPTION))
Expand Down
16 changes: 11 additions & 5 deletions tests/integration/backup/test_session_timeout.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from aiokafka.errors import NoBrokersAvailable
from confluent_kafka.admin import NewTopic
from karapace.backup.api import BackupVersion, create_backup
from karapace.config import Config, DEFAULTS, set_config_defaults
from karapace.config import Config
from karapace.kafka.admin import KafkaAdminClient
from karapace.kafka_utils import kafka_producer_from_config
from pathlib import Path
Expand Down Expand Up @@ -55,9 +55,9 @@ def test_producer_with_custom_kafka_properties_does_not_fail(
This test ensures that the `session.timeout.ms` can be injected in
the kafka config so that the exception isn't raised
"""
config = set_config_defaults(
Config(bootstrap_uri=kafka_server_session_timeout.bootstrap_servers, session_timeout_ms=SESSION_TIMEOUT_MS)
)
config = Config()
config.bootstrap_uri = kafka_server_session_timeout.bootstrap_servers[0]
config.session_timeout_ms = SESSION_TIMEOUT_MS

admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)
Expand Down Expand Up @@ -101,6 +101,12 @@ def test_producer_with_custom_kafka_properties_fail(
admin_client = KafkaAdminClient(bootstrap_servers=kafka_server_session_timeout.bootstrap_servers)
admin_client.new_topic(new_topic.topic, num_partitions=1, replication_factor=1)

config = Config()
# TODO: This test is broken. Test has used localhost:9092 when this should use
# the configured broker from kafka_server_session.
# config.bootstrap_uri = kafka_server_session_timeout.bootstrap_servers[0]
config.bootstrap_uri = "localhost:9092"

with pytest.raises(NoBrokersAvailable):
with kafka_producer_from_config(DEFAULTS) as producer:
with kafka_producer_from_config(config) as producer:
_ = producer
Loading

0 comments on commit 60a8086

Please sign in to comment.