Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass logger to confluent producer and consumer #1464

Merged
merged 10 commits into from
May 25, 2024
2 changes: 1 addition & 1 deletion .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
dependant
dependant
6 changes: 3 additions & 3 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1079,
"line_number": 1269,
"is_secret": false
}
],
Expand All @@ -138,7 +138,7 @@
"filename": "examples/e10_middlewares.py",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 33,
"line_number": 35,
"is_secret": false
}
],
Expand All @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-04-23T11:41:19Z"
"generated_at": "2024-05-24T07:31:48Z"
}
2 changes: 1 addition & 1 deletion docs/docs/en/nats/jetstream/key-value.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,4 @@ from faststream.nats import NatsBroker, KvWatch
)
async def handler(msg: str):
...
```
```
2 changes: 1 addition & 1 deletion docs/docs/en/nats/jetstream/object.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,4 @@ from faststream.nats import NatsBroker, ObjWatch
)
async def handler(filename: str):
...
```
```
24 changes: 12 additions & 12 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,37 +32,37 @@ hide:
This is the time for a new **NATS** features! **FastStream** supports **NATS Key-Value** and **Object Storage** subscribption features in a native way now (big thx for @sheldygg)!

1. KeyValue creation and watching API added (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/key-value/) for changes):

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("some-key", kv_watch="bucket")
async def handler(msg: int, logger: Logger):
logger.info(msg)

@app.after_startup
async def test():
kv = await broker.key_value("bucket")
await kv.put("some-key", b"1")
```

2. ObjectStore API added as well (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/object/) for changes):

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("file-bucket", obj_watch=True)
async def handler(filename: str, logger: Logger):
logger.info(filename)

@app.after_startup
async def test():
object_store = await broker.object_storage("file-bucket")
Expand All @@ -74,22 +74,22 @@ This is the time for a new **NATS** features! **FastStream** supports **NATS Key
```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test", stream="stream", pull_sub=True)
async def handler(msg, logger: Logger):
logger.info(msg)
```

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

```python
@broker.subscriber("tests", no_reply=True)
async def handler():
....

# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
```
Expand Down
2 changes: 2 additions & 0 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ async def _connect( # type: ignore[override]
producer = AsyncConfluentProducer(
**kwargs,
client_id=client_id,
logger=self.logger,
)

self._producer = AsyncConfluentFastProducer(
Expand All @@ -457,6 +458,7 @@ async def _connect( # type: ignore[override]
return partial(
AsyncConfluentConsumer,
**filter_by_dict(ConsumerConnectionParams, kwargs),
logger=self.logger,
)

async def start(self) -> None:
Expand Down
19 changes: 17 additions & 2 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from ssl import SSLContext
from time import time
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Expand All @@ -16,10 +17,14 @@
from confluent_kafka import Consumer, KafkaError, KafkaException, Message, Producer
from confluent_kafka.admin import AdminClient, NewTopic
from pydantic import BaseModel
from typing_extensions import Annotated, Doc

from faststream.log import logger
from faststream.utils.functions import call_or_await

if TYPE_CHECKING:
from faststream.types import LoggerProto

_missing = object()


Expand Down Expand Up @@ -105,7 +110,12 @@ def __init__(
sasl_kerberos_service_name: str = "kafka",
sasl_kerberos_domain_name: Optional[str] = None,
sasl_oauth_token_provider: Optional[str] = None,
logger: Annotated[
Union["LoggerProto", None, object],
Doc("User specified logger to pass into Context and log service messages."),
] = logger,
) -> None:
self.logger = logger
if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers, str
):
Expand Down Expand Up @@ -145,7 +155,7 @@ def __init__(
}
)

self.producer = Producer(self.config)
self.producer = Producer(self.config, logger=self.logger)
# self.producer.init_transactions()
self.producer.list_topics()
self.loop = loop or asyncio.get_event_loop()
Expand Down Expand Up @@ -295,7 +305,12 @@ def __init__(
sasl_kerberos_service_name: str = "kafka",
sasl_kerberos_domain_name: Optional[str] = None,
sasl_oauth_token_provider: Optional[str] = None,
logger: Annotated[
Union["LoggerProto", None, object],
Doc("User specified logger to pass into Context and log service messages."),
] = logger,
) -> None:
self.logger = logger
if group_id is None:
group_id = "confluent-kafka-consumer-group"

Expand Down Expand Up @@ -352,7 +367,7 @@ def __init__(
self.loop = loop or asyncio.get_event_loop()

create_topics(topics=self.topics, config=self.config)
self.consumer = Consumer(self.config)
self.consumer = Consumer(self.config, logger=self.logger)

async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
Expand Down
55 changes: 55 additions & 0 deletions tests/brokers/confluent/test_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
import logging
from typing import Any, ClassVar, Dict

import pytest

from faststream.broker.core.usecase import BrokerUsecase
from faststream.confluent import KafkaBroker


@pytest.mark.confluent()
class TestLogger:
"""A class to represent a test Kafka broker."""

timeout: int = 10
subscriber_kwargs: ClassVar[Dict[str, Any]] = {"auto_offset_reset": "earliest"}

def get_broker(self, apply_types: bool = False):
return KafkaBroker(apply_types=apply_types)

def patch_broker(self, broker: BrokerUsecase[Any, Any]) -> BrokerUsecase[Any, Any]:
return broker

@pytest.mark.asyncio()
async def test_custom_logger(
self,
queue: str,
event: asyncio.Event,
):
test_logger = logging.getLogger("test_logger")
consume_broker = KafkaBroker(logger=test_logger)

@consume_broker.subscriber(queue, **self.subscriber_kwargs)
def subscriber(m):
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()

for sub in br._subscribers.values():
consumer_logger = sub.consumer.logger
assert consumer_logger == test_logger

producer_logger = br._producer._producer.logger
assert producer_logger == test_logger

await asyncio.wait(
(
asyncio.create_task(br.publish("hello", queue)),
asyncio.create_task(event.wait()),
),
timeout=self.timeout,
)

assert event.is_set()
Loading