Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
fbraem authored May 23, 2024
2 parents 16494a9 + f6b3336 commit 8a3f6bd
Show file tree
Hide file tree
Showing 34 changed files with 1,796 additions and 734 deletions.
78 changes: 78 additions & 0 deletions docs/docs/en/release.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,84 @@ hide:
---

# Release Notes
## 0.5.8

### What's Changed

This is the time for a new **NATS** features! **FastStream** supports **NATS Key-Value** and **Object Storage** subscribption features in a native way now (big thx for @sheldygg)!

1. KeyValue creation and watching API added (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/key-value/) for changes):

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("some-key", kv_watch="bucket")
async def handler(msg: int, logger: Logger):
logger.info(msg)

@app.after_startup
async def test():
kv = await broker.key_value("bucket")
await kv.put("some-key", b"1")
```

2. ObjectStore API added as well (you can read updated [documentation section](https://faststream.airt.ai/latest/nats/jetstream/object/) for changes):

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("file-bucket", obj_watch=True)
async def handler(filename: str, logger: Logger):
logger.info(filename)

@app.after_startup
async def test():
object_store = await broker.object_storage("file-bucket")
await object_store.put("some-file.txt", b"1")
```

3. Also now you can use just `pull_sub=True` instead of `pull_sub=PullSub()` in basic case:

```python
from faststream import FastStream, Logger
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test", stream="stream", pull_sub=True)
async def handler(msg, logger: Logger):
logger.info(msg)
```

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

```python
@broker.subscriber("tests", no_reply=True)
async def handler():
....

# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
```

* fix: when headers() returns None in AsyncConfluentParser, replace it with an empty tuple by @andreaimprovised in https://github.com/airtai/faststream/pull/1460
* Implement Kv/Obj watch. by @sheldygg in https://github.com/airtai/faststream/pull/1383
* feat: add subscriber no-reply option by @Lancetnik in https://github.com/airtai/faststream/pull/1461

### New Contributors
* @andreaimprovised made their first contribution in https://github.com/airtai/faststream/pull/1460

**Full Changelog**: https://github.com/airtai/faststream/compare/0.5.7...0.5.8

## 0.5.7

### What's Changed
Expand Down
1 change: 0 additions & 1 deletion faststream/broker/publisher/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.publisher.proto import PublisherProto
from faststream.broker.types import (
BrokerMiddleware,
MsgType,
P_HandlerParams,
T_HandlerReturn,
Expand Down
18 changes: 15 additions & 3 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from faststream.asyncapi.abc import AsyncAPIOperation
from faststream.asyncapi.message import parse_handler_params
from faststream.asyncapi.utils import to_camelcase
from faststream.broker.publisher.proto import ProducerProto
from faststream.broker.subscriber.call_item import HandlerItem
from faststream.broker.subscriber.proto import SubscriberProto
from faststream.broker.types import (
Expand All @@ -40,6 +39,7 @@

from faststream.broker.message import StreamMessage
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.publisher.proto import BasePublisherProto, ProducerProto
from faststream.broker.types import (
AsyncCallable,
BrokerMiddleware,
Expand Down Expand Up @@ -93,6 +93,7 @@ def __init__(
self,
*,
no_ack: bool,
no_reply: bool,
retry: Union[bool, int],
broker_dependencies: Iterable["Depends"],
broker_middlewares: Iterable["BrokerMiddleware[MsgType]"],
Expand All @@ -108,6 +109,7 @@ def __init__(

self._default_parser = default_parser
self._default_decoder = default_decoder
self._no_reply = no_reply
# Watcher args
self._no_ack = no_ack
self._retry = retry
Expand Down Expand Up @@ -139,7 +141,7 @@ def setup( # type: ignore[override]
self,
*,
logger: Optional["LoggerProto"],
producer: Optional[ProducerProto],
producer: Optional["ProducerProto"],
graceful_timeout: Optional[float],
extra_context: "AnyDict",
# broker options
Expand Down Expand Up @@ -338,7 +340,7 @@ async def consume(self, msg: MsgType) -> Any:
)

for p in chain(
self._make_response_publisher(message),
self.__get_reponse_publisher(message),
h.handler._publishers,
):
await p.publish(
Expand All @@ -358,6 +360,16 @@ async def consume(self, msg: MsgType) -> Any:

return None

def __get_reponse_publisher(
self,
message: "StreamMessage[MsgType]",
) -> Iterable["BasePublisherProto"]:
if not message.reply_to or self._no_reply:
return ()

else:
return self._make_response_publisher(message)

def get_log_context(
self,
message: Optional["StreamMessage[MsgType]"],
Expand Down
Loading

0 comments on commit 8a3f6bd

Please sign in to comment.