diff --git a/karapace/backup/api.py b/karapace/backup/api.py index f5e5f8919..7a691a574 100644 --- a/karapace/backup/api.py +++ b/karapace/backup/api.py @@ -47,7 +47,6 @@ import json import logging import math -import sys import textwrap __all__ = ( @@ -166,7 +165,7 @@ def before_sleep(it: RetryCallState) -> None: result = f"failed ({outcome.exception()})" else: result = f"returned {outcome.result()!r}" - print(f"{description} {result}, retrying... (Ctrl+C to abort)", file=sys.stderr) + LOG.info(f"{description} {result}, retrying... (Ctrl+C to abort)") # pylint: disable=logging-fstring-interpolation return before_sleep @@ -627,12 +626,14 @@ class VerifyLevel(enum.Enum): def verify(backup_location: ExistingFile, level: VerifyLevel) -> None: + console = Console() + error_console = Console(stderr=True) + backup_version = BackupVersion.identify(backup_location) if backup_version is not BackupVersion.V3: - print( - f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}.", - file=sys.stderr, + error_console.print( + f"Only backups using format {BackupVersion.V3.name} can be verified, found {backup_version.name}." ) raise SystemExit(1) @@ -646,7 +647,6 @@ def verify(backup_location: ExistingFile, level: VerifyLevel) -> None: else: assert_never(level) - console = Console() success = True verified_files = 0 diff --git a/karapace/backup/cli.py b/karapace/backup/cli.py index 8edb431f6..2aeaafc2d 100644 --- a/karapace/backup/cli.py +++ b/karapace/backup/cli.py @@ -11,11 +11,16 @@ from .poll_timeout import PollTimeout from karapace.backup.api import VerifyLevel from karapace.config import Config, read_config +from typing import Iterator import argparse +import contextlib +import logging import sys import traceback +logger = logging.getLogger(__name__) + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Karapace schema backup tool") @@ -29,8 +34,10 @@ def parse_args() -> argparse.Namespace: "export-anonymized-avro-schemas", help="Export anonymized Avro schemas into a file" ) + # Options shared by all subparsers. for p in (parser_get, parser_restore, parser_inspect, parser_verify, parser_export_anonymized_avro_schemas): p.add_argument("--location", default="", help="File path for the backup file") + p.add_argument("--verbose", default=False, action="store_true", help="Enable debug logging.") for p in (parser_get, parser_restore, parser_export_anonymized_avro_schemas): p.add_argument("--config", help="Configuration file path", required=True) @@ -77,6 +84,11 @@ def get_config(args: argparse.Namespace) -> Config: def dispatch(args: argparse.Namespace) -> None: + logging.basicConfig( + stream=sys.stderr, + level=logging.DEBUG if args.verbose else logging.INFO, + ) + location = api.normalize_location(args.location) if args.command == "get": @@ -124,29 +136,34 @@ def dispatch(args: argparse.Namespace) -> None: raise NotImplementedError(f"Unknown command: {args.command!r}") -def main() -> None: +@contextlib.contextmanager +def handle_keyboard_interrupt() -> Iterator[None]: try: - args = parse_args() - - try: - dispatch(args) - # TODO: This specific treatment of StaleConsumerError looks quite misplaced - # here, and should probably be pushed down into the (internal) API layer. - except StaleConsumerError as e: - print( - f"The Kafka consumer did not receive any records for partition {e.topic_partition.partition} of topic " - f"{e.topic_partition.topic!r} " - f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} " - f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n" - "\n" - "Try increasing --poll-timeout to give the broker more time.", - file=sys.stderr, - ) - raise SystemExit(1) from e + yield except KeyboardInterrupt as e: # Not an error -- user choice -- and thus should not end up in a Python stacktrace. raise SystemExit(2) from e +@handle_keyboard_interrupt() +def main() -> None: + args = parse_args() + + try: + dispatch(args) + # TODO: This specific treatment of StaleConsumerError looks quite misplaced + # here, and should probably be pushed down into the (internal) API layer. + except StaleConsumerError as e: + logger.error( # pylint: disable=logging-fstring-interpolation + f"The Kafka consumer did not receive any records for partition {e.topic_partition.partition} of topic " + f"{e.topic_partition.topic!r} " + f"within the poll timeout ({e.poll_timeout} seconds) while trying to reach offset {e.end_offset:,} " + f"(start was {e.start_offset:,} and the last seen offset was {e.last_offset:,}).\n" + "\n" + "Try increasing --poll-timeout to give the broker more time.", + ) + raise SystemExit(1) from e + + if __name__ == "__main__": main() diff --git a/tests/integration/backup/test_v3_backup.py b/tests/integration/backup/test_v3_backup.py index d10ef3ae4..7bdcf4b40 100644 --- a/tests/integration/backup/test_v3_backup.py +++ b/tests/integration/backup/test_v3_backup.py @@ -30,6 +30,7 @@ import datetime import json +import logging import os import pytest import secrets @@ -37,6 +38,8 @@ import subprocess import textwrap +logger = logging.getLogger(__name__) + @pytest.fixture(scope="function", name="karapace_config") def config_fixture( @@ -350,9 +353,9 @@ def test_exits_with_return_code_3_for_data_restoration_error( try: admin_client.delete_topics([topic_name]) except UnknownTopicOrPartitionError: - print("No previously existing topic.") + logger.info("No previously existing topic.") else: - print("Deleted topic from previous run.") + logger.info("Deleted topic from previous run.") admin_client.create_topics([NewTopic(topic_name, 1, 1)]) with pytest.raises(subprocess.CalledProcessError) as er: @@ -390,9 +393,9 @@ def test_roundtrip_from_file( try: admin_client.delete_topics([topic_name]) except UnknownTopicOrPartitionError: - print("No previously existing topic.") + logger.info("No previously existing topic.") else: - print("Deleted topic from previous run.") + logger.info("Deleted topic from previous run.") # Execute backup restoration. subprocess.run( @@ -483,9 +486,9 @@ def test_roundtrip_from_file_skipping_topic_creation( try: admin_client.delete_topics([topic_name]) except UnknownTopicOrPartitionError: - print("No previously existing topic.") + logger.info("No previously existing topic.") else: - print("Deleted topic from previous run.") + logger.info("Deleted topic from previous run.") admin_client.create_topics( [NewTopic(topic_name, 1, 1)], @@ -578,9 +581,9 @@ def test_backup_restoration_fails_when_topic_does_not_exist_and_skip_creation_is try: admin_client.delete_topics([topic_name]) except UnknownTopicOrPartitionError: - print("No previously existing topic.") + logger.info("No previously existing topic.") else: - print("Deleted topic from previous run.") + logger.info("Deleted topic from previous run.") config = set_config_defaults( { @@ -632,9 +635,9 @@ def test_producer_raises_exceptions( try: admin_client.delete_topics([topic_name]) except UnknownTopicOrPartitionError: - print("No previously existing topic.") + logger.info("No previously existing topic.") else: - print("Deleted topic from previous run.") + logger.info("Deleted topic from previous run.") config = set_config_defaults( {