Skip to content

Commit

Permalink
Pass logger to confluent producer and consumer (#1464)
Browse files Browse the repository at this point in the history
* Pass logger to confluent producer and consumer

* Add test to check consumer_logger

* Update func signature

* Update function signature

* User LoggerProto and remove Annotated

* Use only logger in signature

* Use LoggerProto as type
  • Loading branch information
kumaranvpl authored May 25, 2024
1 parent d5dce31 commit 1237edb
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dependant
dependant
6 changes: 3 additions & 3 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1079,
"line_number": 1269,
"is_secret": false
}
],
Expand All @@ -138,7 +138,7 @@
"filename": "examples/e10_middlewares.py",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 33,
"line_number": 35,
"is_secret": false
}
],
Expand All @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-04-23T11:41:19Z"
"generated_at": "2024-05-24T07:31:48Z"
}
2 changes: 1 addition & 1 deletion docs/docs/en/nats/jetstream/key-value.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ from faststream.nats import NatsBroker, KvWatch
)
async def handler(msg: str):
...
```
```
2 changes: 1 addition & 1 deletion docs/docs/en/nats/jetstream/object.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ from faststream.nats import NatsBroker, ObjWatch
)
async def handler(filename: str):
...
```
```
24 changes: 12 additions & 12 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,37 @@ hide:
This is the time for a new **NATS** features! **FastStream** supports **NATS Key-Value** and **Object Storage** subscribption features in a native way now (big thx for @sheldygg)!

1. KeyValue creation and watching API added (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/key-value/) for changes):

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("some-key", kv_watch="bucket")
async def handler(msg: int, logger: Logger):
logger.info(msg)

@app.after_startup
async def test():
kv = await broker.key_value("bucket")
await kv.put("some-key", b"1")
```

2. ObjectStore API added as well (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/object/) for changes):

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("file-bucket", obj_watch=True)
async def handler(filename: str, logger: Logger):
logger.info(filename)

@app.after_startup
async def test():
object_store = await broker.object_storage("file-bucket")
Expand All @@ -74,22 +74,22 @@ This is the time for a new **NATS** features! **FastStream** supports **NATS Key
```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test", stream="stream", pull_sub=True)
async def handler(msg, logger: Logger):
logger.info(msg)
```

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

```python
@broker.subscriber("tests", no_reply=True)
async def handler():
....

# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
```
Expand Down
2 changes: 2 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ async def _connect( # type: ignore[override]
producer = AsyncConfluentProducer(
**kwargs,
client_id=client_id,
logger=self.logger,
)

self._producer = AsyncConfluentFastProducer(
Expand All @@ -457,6 +458,7 @@ async def _connect( # type: ignore[override]
return partial(
AsyncConfluentConsumer,
**filter_by_dict(ConsumerConnectionParams, kwargs),
logger=self.logger,
)

async def start(self) -> None:
Expand Down
19 changes: 17 additions & 2 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ssl import SSLContext
from time import time
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -16,10 +17,14 @@
from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from pydantic import BaseModel
from typing_extensions import Annotated, Doc

from faststream.log import logger
from faststream.utils.functions import call_or_await

if TYPE_CHECKING:
from faststream.types import LoggerProto

_missing = object()


Expand Down Expand Up @@ -105,7 +110,12 @@ def __init__(
sasl_kerberos_service_name: str = "kafka",
sasl_kerberos_domain_name: Optional[str] = None,
sasl_oauth_token_provider: Optional[str] = None,
logger: Annotated[
Union["LoggerProto", None, object],
Doc("User specified logger to pass into Context and log service messages."),
] = logger,
) -> None:
self.logger = logger
if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers, str
):
Expand Down Expand Up @@ -145,7 +155,7 @@ def __init__(
}
)

self.producer = Producer(self.config)
self.producer = Producer(self.config, logger=self.logger)
# self.producer.init_transactions()
self.producer.list_topics()
self.loop = loop or asyncio.get_event_loop()
Expand Down Expand Up @@ -295,7 +305,12 @@ def __init__(
sasl_kerberos_service_name: str = "kafka",
sasl_kerberos_domain_name: Optional[str] = None,
sasl_oauth_token_provider: Optional[str] = None,
logger: Annotated[
Union["LoggerProto", None, object],
Doc("User specified logger to pass into Context and log service messages."),
] = logger,
) -> None:
self.logger = logger
if group_id is None:
group_id = "confluent-kafka-consumer-group"

Expand Down Expand Up @@ -352,7 +367,7 @@ def __init__(
self.loop = loop or asyncio.get_event_loop()

create_topics(topics=self.topics, config=self.config)
self.consumer = Consumer(self.config)
self.consumer = Consumer(self.config, logger=self.logger)

async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
Expand Down
55 changes: 55 additions & 0 deletions tests/brokers/confluent/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
import logging
from typing import Any, ClassVar, Dict

import pytest

from faststream.broker.core.usecase import BrokerUsecase
from faststream.confluent import KafkaBroker


@pytest.mark.confluent()
class TestLogger:
"""A class to represent a test Kafka broker."""

timeout: int = 10
subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"}

def get_broker(self, apply_types: bool = False):
return KafkaBroker(apply_types=apply_types)

def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]:
return broker

@pytest.mark.asyncio()
async def test_custom_logger(
self,
queue: str,
event: asyncio.Event,
):
test_logger = logging.getLogger("test_logger")
consume_broker = KafkaBroker(logger=test_logger)

@consume_broker.subscriber(queue, **self.subscriber_kwargs)
def subscriber(m):
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()

for sub in br._subscribers.values():
consumer_logger = sub.consumer.logger
assert consumer_logger == test_logger

producer_logger = br._producer._producer.logger
assert producer_logger == test_logger

await asyncio.wait(
(
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)

assert event.is_set()

0 comments on commit 1237edb

Please sign in to comment.