Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace sync Kafka Producers with confluent_kafka one #765

Merged
merged 1 commit into from
Nov 30, 2023

Conversation

matyaskuti
Copy link
Contributor

@matyaskuti matyaskuti commented Nov 28, 2023

About this change - What it does

  • Introduce a synchronous, conlfuent_kafka-based producer client
  • Extract and split off common code from the previously introduced KafkaAdminClient
  • Create karapace.kafka to contain all Kafka client code
  • Still reuse kafka-python's exception classes
  • Many of the changed files are only import and slight interface adjustments

Why this way

  • Removing explicit references to kafka-python in the code can be done after sync consumers are replaced with confluent-kafka-based ones too, as well as removing the reliance on kafka-python's exceptions

@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_producer branch 11 times, most recently from cad2885 to 69b591d Compare November 28, 2023 15:13
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is extracted from the pre-existing karapace/kafka_admin.py, with the exception of the translate_from_kafkaerror function.

self._errors: set[KafkaError] = set()
self.log = logging.getLogger(f"{self.__module__}.{self.__class__.__qualname__}")

super().__init__(self._get_config_from_params(bootstrap_servers, **params)) # type: ignore[call-arg]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type: ignore is needed because mypy doesn't know the mixin will be used alongside classes that can handle this __init__ call. Unfortunately protocols don't help either, due to the mixin's methods calling each other.

def _activate_callbacks(self) -> None:
# Any client in the `confluent_kafka` library needs `poll` called to
# trigger any callbacks registered (eg. for errors, OAuth tokens, etc.)
self.poll(timeout=0.0) # type: ignore[attr-defined]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type: ignore is needed because mypy doesn't know the mixin will be used alongside classes that can handle this poll call. Unfortunately protocols don't help either, due to the mixin's methods calling each other.

"""
for _ in range(3):
try:
self.list_topics(timeout=1) # type: ignore[attr-defined]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type: ignore is needed because mypy doesn't know the mixin will be used alongside classes that can handle this list_topics call. Unfortunately protocols don't help either, due to the mixin's methods calling each other.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from the pre-existing karapace/kafka_admin.py, with some of the code moved to karapace/kafka/common.py

Copy link
Contributor Author

@matyaskuti matyaskuti Nov 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into karapace/kafka/admin.py and karapace/kafka/common.py

@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_producer branch from 69b591d to 54aae1e Compare November 28, 2023 15:29
@matyaskuti matyaskuti marked this pull request as ready for review November 28, 2023 16:35
@matyaskuti matyaskuti requested review from a team as code owners November 28, 2023 16:35
This change replaces all synchronous Kafka producers (from the
kafka-python library), with a new implementation based on
confluent-kafka-python's `Producer`. The aim is to keep the same
interface as much as possible.

The change so far doesn't intend to fully remove all references to
kafka-python, most notably developer friendly errors and exceptions are
still coming from kafka-python.

A new `karapace.kafka` module is introduced, splitting the previously
added admin client and the new producer into their own modules.

Resources:
* confluent-kafka-python documentation:
  https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#
* librdkafka configuration documentation:
  https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md
@matyaskuti matyaskuti force-pushed the matyaskuti/confluent_kafka_sync_producer branch from 54aae1e to b606514 Compare November 29, 2023 09:31
Copy link
Contributor

@aiven-anton aiven-anton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice work 👍

).add_errback(producer_error_callback)
except (KafkaError, AssertionError) as ex:
raise BackupDataRestorationError("Error while calling send on restoring messages") from ex
headers=[(key.decode(), value) for key, value in instruction.headers if key is not None],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note for posterity, this was discussed out of bands: header keys cannot be null, so it's correct to simplify handling here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doing that instead of asserting that isn't None?
In that way if the assumptions isn't true we are skipping a record, IMO we should fail if an assumption is violated rather than proving that currently isn't violated and if the implementation changes in the future we have a skip instead of an error

"ResourceType",
"TopicMetadata",
)

class AdminClient:
def __init__(self, config: dict[str, str | int | Callable]) -> None: ...
def poll(self, timeout: float) -> int: ...
def poll(self, timeout: float = -1) -> int: ...
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def poll(self, timeout: float = -1) -> int: ...
def poll(self, timeout: float = ...) -> int: ...

nit (non-blocking): Default values have no effect in type hints, and are usually omitted to eliminate duplication (see last paragraph of this section for reference).

@aiven-anton aiven-anton merged commit dd9e87e into main Nov 30, 2023
8 checks passed
@aiven-anton aiven-anton deleted the matyaskuti/confluent_kafka_sync_producer branch November 30, 2023 11:56
matyaskuti pushed a commit that referenced this pull request Dec 6, 2023
Previously in #765 skipping
None header keys was added. While now it's not causing any problems, in
the future it could be an issue if we suppress/skip error-cases instead
of explicit failure.
matyaskuti pushed a commit that referenced this pull request Dec 6, 2023
Previously in #765 skipping
None header keys was added. While now it's not causing any problems, in
the future it could be an issue if we suppress/skip error-cases instead
of explicit failure.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants