Skip to content

Commit

Permalink
Merge pull request #918 from Aiven-Open/anatolii/fix-consumer-client-id
Browse files Browse the repository at this point in the history
Changing the REST proxy consumer client id to be plain string instead of tuple
  • Loading branch information
jjaakola-aiven authored Aug 9, 2024
2 parents edb64b9 + a852c21 commit 99c1046
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions karapace/kafka_rest_apis/consumer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ def _illegal_state_fail(message: str, content_type: str) -> None:
# CONSUMER
async def create_consumer(self, group_name: str, request_data: dict, content_type: str):
group_name = group_name.strip("/")
consumer_name = request_data.get("name") or new_name()
internal_name = self.create_internal_name(group_name, consumer_name)
consumer_name: str = request_data.get("name") or new_name()
internal_name: Tuple[str, str] = self.create_internal_name(group_name, consumer_name)
async with self.consumer_locks[internal_name]:
if internal_name in self.consumers:
LOG.warning(
Expand Down Expand Up @@ -191,7 +191,7 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ
request_data["auto.commit.enable"] = enable_commit
request_data["auto.offset.reset"] = request_data.get("auto.offset.reset", "earliest")
fetch_min_bytes = request_data.get("fetch.min.bytes", self.config["fetch_min_bytes"])
c = await self.create_kafka_consumer(fetch_min_bytes, group_name, internal_name, request_data)
c = await self.create_kafka_consumer(fetch_min_bytes, group_name, consumer_name, request_data)
except KafkaConfigurationError as e:
KarapaceBase.internal_error(str(e), content_type)
self.consumers[internal_name] = TypedConsumer(
Expand All @@ -200,7 +200,7 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ
consumer_base_uri = urljoin(self.base_uri, f"consumers/{group_name}/instances/{consumer_name}")
KarapaceBase.r(content_type=content_type, body={"base_uri": consumer_base_uri, "instance_id": consumer_name})

async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data):
async def create_kafka_consumer(self, fetch_min_bytes, group_name, client_id: str, request_data):
for retry in [True, True, False]:
try:
session_timeout_ms = self.config["session_timeout_ms"]
Expand All @@ -212,7 +212,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name
c = AsyncKafkaConsumer(
bootstrap_servers=self.config["bootstrap_uri"],
auto_offset_reset=request_data["auto.offset.reset"],
client_id=internal_name,
client_id=client_id,
enable_auto_commit=request_data["auto.commit.enable"],
fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms"),
fetch_message_max_bytes=self.config["consumer_request_max_bytes"],
Expand Down

0 comments on commit 99c1046

Please sign in to comment.