diff --git a/karapace/kafka_rest_apis/consumer_manager.py b/karapace/kafka_rest_apis/consumer_manager.py index 8175414a5..a2792303b 100644 --- a/karapace/kafka_rest_apis/consumer_manager.py +++ b/karapace/kafka_rest_apis/consumer_manager.py @@ -155,8 +155,8 @@ def _illegal_state_fail(message: str, content_type: str) -> None: # CONSUMER async def create_consumer(self, group_name: str, request_data: dict, content_type: str): group_name = group_name.strip("/") - consumer_name = request_data.get("name") or new_name() - internal_name = self.create_internal_name(group_name, consumer_name) + consumer_name: str = request_data.get("name") or new_name() + internal_name: Tuple[str, str] = self.create_internal_name(group_name, consumer_name) async with self.consumer_locks[internal_name]: if internal_name in self.consumers: LOG.warning( @@ -191,7 +191,7 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ request_data["auto.commit.enable"] = enable_commit request_data["auto.offset.reset"] = request_data.get("auto.offset.reset", "earliest") fetch_min_bytes = request_data.get("fetch.min.bytes", self.config["fetch_min_bytes"]) - c = await self.create_kafka_consumer(fetch_min_bytes, group_name, internal_name, request_data) + c = await self.create_kafka_consumer(fetch_min_bytes, group_name, consumer_name, request_data) except KafkaConfigurationError as e: KarapaceBase.internal_error(str(e), content_type) self.consumers[internal_name] = TypedConsumer( @@ -200,7 +200,7 @@ async def create_consumer(self, group_name: str, request_data: dict, content_typ consumer_base_uri = urljoin(self.base_uri, f"consumers/{group_name}/instances/{consumer_name}") KarapaceBase.r(content_type=content_type, body={"base_uri": consumer_base_uri, "instance_id": consumer_name}) - async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name, request_data): + async def create_kafka_consumer(self, fetch_min_bytes, group_name, client_id: str, request_data): for retry in [True, True, False]: try: session_timeout_ms = self.config["session_timeout_ms"] @@ -212,7 +212,7 @@ async def create_kafka_consumer(self, fetch_min_bytes, group_name, internal_name c = AsyncKafkaConsumer( bootstrap_servers=self.config["bootstrap_uri"], auto_offset_reset=request_data["auto.offset.reset"], - client_id=internal_name, + client_id=client_id, enable_auto_commit=request_data["auto.commit.enable"], fetch_max_wait_ms=self.config.get("consumer_fetch_max_wait_ms"), fetch_message_max_bytes=self.config["consumer_request_max_bytes"],