Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: new subscriber inner logic #1128

Closed
wants to merge 90 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
90 commits
Select commit Hold shift + click to select a range
90777a5
refactor: new subscriber logic in NATS
Lancetnik Jan 10, 2024
13fcec0
tests: fix raw tests
Lancetnik Jan 10, 2024
d82f518
refactor: nats completed
Lancetnik Jan 10, 2024
8a33552
chore: remove useless
Lancetnik Jan 10, 2024
0695379
lint: fix ruff warnings
Lancetnik Jan 10, 2024
609d243
refactor: remove useless broker methods
Lancetnik Jan 10, 2024
c223cc7
refactor: move wrap handler logic to mixin class
Lancetnik Jan 10, 2024
d28e60b
refactor: remove useless broker methods
Lancetnik Jan 10, 2024
3aa5d1d
refactor: new handler consume logic
Lancetnik Jan 11, 2024
15cd9b6
refactore: new middlewares
Lancetnik Jan 11, 2024
0b9d4f4
fix #690: ack message after response publish
Lancetnik Jan 11, 2024
0b8c7c9
fix #840: broker.publish respects middlewares
Lancetnik Jan 11, 2024
2c4c28a
fix: correct connection closing
Lancetnik Jan 13, 2024
c12df70
fix: return published message in tests
Lancetnik Jan 13, 2024
1ee5d1e
fix: correct middlewares
Lancetnik Jan 13, 2024
7fab3f2
refactor: remove useless code
Lancetnik Jan 13, 2024
cc5c7fb
refactore: catch StopConsume at any level
Lancetnik Jan 13, 2024
1d4526a
refactor: new subscriber middlewares
Lancetnik Jan 14, 2024
fd48643
refactor: nats new structure
Lancetnik Jan 14, 2024
3984422
refactor: nats new structure
Lancetnik Jan 14, 2024
ca7d7d7
feat: publisher middlewares
Lancetnik Jan 14, 2024
d072a00
lint: publisher middlewares hint
Lancetnik Jan 14, 2024
827c316
tests: router middlewares & depends tests
Lancetnik Jan 14, 2024
2c9896b
refactor: new logging logic
Lancetnik Jan 16, 2024
9956cf4
fix: correct Windows shutdown
Lancetnik Jan 17, 2024
a82997e
refactor: fix all nats tests
Lancetnik Jan 17, 2024
51dfa79
refactor: copy broker publisher middleware to all publishers
Lancetnik Jan 17, 2024
ff12138
refactor: pass all args to publisher middleware
Lancetnik Jan 18, 2024
34aa874
refactor: unify AsyncAPI names
Lancetnik Jan 20, 2024
710482d
fix #1143: ignore Depends in AsyncAPI
Lancetnik Jan 20, 2024
3a38330
refactor: split regular and batch handlers
Lancetnik Jan 20, 2024
72bf001
refactor: split parsers
Lancetnik Jan 20, 2024
2ac0f2e
chore: add useles comment
Lancetnik Jan 20, 2024
1817936
fix: correct logging
Lancetnik Jan 20, 2024
244c936
refactor: new RMQ logic
Lancetnik Jan 22, 2024
eb4b548
refactor: new Kakfa
Lancetnik Jan 23, 2024
729f044
refactor: fix Kafka consumer tests
Lancetnik Jan 24, 2024
8255742
refactor: complete Kafka
Lancetnik Jan 31, 2024
8af43a7
refactor: raw Redis
Lancetnik Feb 1, 2024
0ab95fc
fix: correct RMQ default ssl port
Lancetnik Feb 1, 2024
f6a7f6c
refactor: split Redis consumers
Lancetnik Feb 2, 2024
8095852
feat: stoppable application
Lancetnik Feb 3, 2024
c23794e
refactor: explicit NATS annotations
Lancetnik Feb 7, 2024
a26211f
refactor: remove fastapi logic from core
Lancetnik Feb 7, 2024
5a7f342
refactor: remove logging args
Lancetnik Feb 7, 2024
386716b
chore: merge main
Lancetnik Feb 21, 2024
6918c18
fix: stop RMQ Broker correct
Lancetnik Feb 21, 2024
c75903a
refactor: mv nats init args
Lancetnik Feb 21, 2024
27e49b4
feat (#1252): respect Redis StreamSub last_id with consumer group
Lancetnik Feb 23, 2024
72d89de
feat: FastStream object public methods
Lancetnik Feb 23, 2024
c641058
fix: correct Redis consumer group behavior
Lancetnik Feb 24, 2024
f8c8c8e
fix: correct workers shutdown
Lancetnik Feb 24, 2024
8f5115a
refactor: explicit NATS init
Lancetnik Feb 24, 2024
226f22c
refactor: explicit NatsBroker options
Lancetnik Feb 24, 2024
607dfb1
fix: NastBroker correct publish method
Lancetnik Feb 24, 2024
030088b
refactor: setup NATS init Doc
Lancetnik Feb 25, 2024
4d3bcee
refactor: explicit NatsPublisher options
Lancetnik Feb 25, 2024
ed9bc20
lint: ignore overrides
Lancetnik Feb 27, 2024
b8c1710
chore: stash changes
Lancetnik Mar 5, 2024
8766ac9
refactor: add NatsBroker publisher Docs
Lancetnik Mar 5, 2024
043911f
refactor: detail NATS Docstrings
Lancetnik Mar 5, 2024
ac8de39
refactor: fix missing RMQ typess
Lancetnik Mar 5, 2024
ebeb9aa
refactor: make all brokers compatible with new parent
Lancetnik Mar 6, 2024
7e70e4b
refactor: explicit RMQ Message options
Lancetnik Mar 6, 2024
e02d5aa
refactor: explicit RMQ publishers options
Lancetnik Mar 7, 2024
55d738e
refactor: explicit RMQ routers
Lancetnik Mar 11, 2024
c05e398
refactor: make RabbitMQ object pydantic-independent
Lancetnik Mar 13, 2024
16494e2
refactor: make JStream pydantic independant
Lancetnik Mar 13, 2024
1dacddc
refactor: merge router.publisher middlewares with routers' ones
Lancetnik Mar 13, 2024
8da2cc2
refactor: remove pydantic.BaseModel usages in basic classes
Lancetnik Mar 13, 2024
0307dc8
feat: remove pydantic, fastapi, uvloop dependencies
Lancetnik Mar 14, 2024
64f2642
format: run formatter
Lancetnik Mar 14, 2024
aa0be65
refactor: compelete RMQ annotations
Lancetnik Mar 15, 2024
9e41b6c
refactor: explicit Redis annotations
Lancetnik Mar 18, 2024
38d02f0
refactor: compelete RMQ FastAPI annotations
Lancetnik Mar 18, 2024
fe8ceb9
refactor: fix kafka tests
Lancetnik Mar 19, 2024
08dc0d0
format: run formatter
Lancetnik Mar 19, 2024
eb4da46
Add some documented annotations
AbstractiveNord Mar 20, 2024
3c5df63
Merge pull request #1321 from AbstractiveNord/big-refactor
Lancetnik Mar 20, 2024
6c702dd
refactor: start Confluent
Lancetnik Mar 20, 2024
5cbaba2
style: polish NATS formating
Lancetnik Mar 20, 2024
5623aef
refactor: explicit NATS router
Lancetnik Mar 21, 2024
5692360
chore: update tests
Lancetnik Mar 21, 2024
8589e42
style: fix some Rabbit mypy
Lancetnik Mar 22, 2024
38cae42
refactor: rm useless cache method:
Lancetnik Mar 22, 2024
112ec21
lint: Rabbit and NATS mypy
Lancetnik Mar 25, 2024
8f96be7
chore: fix email
Lancetnik Mar 26, 2024
e921454
feat: return handler object in subscriber
Lancetnik Mar 26, 2024
745a195
lint: fix some mypy
Lancetnik Mar 27, 2024
2a79221
feat: support FastAPI response_model
Lancetnik Mar 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
2 changes: 1 addition & 1 deletion docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ search:
- [BrokerUsecase](api/faststream/broker/core/abc/BrokerUsecase.md)
- [extend_dependencies](api/faststream/broker/core/abc/extend_dependencies.md)
- asynchronous
- [BrokerAsyncUsecase](api/faststream/broker/core/asynchronous/BrokerAsyncUsecase.md)
- [BrokerUsecase](api/faststream/broker/core/asynchronous/BrokerUsecase.md)
- [default_filter](api/faststream/broker/core/asynchronous/default_filter.md)
- mixins
- [LoggingMixin](api/faststream/broker/core/mixins/LoggingMixin.md)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.broker.core.asynchronous.BrokerAsyncUsecase
::: faststream.broker.core.asynchronous.BrokerUsecase
14 changes: 8 additions & 6 deletions docs/docs/en/getting-started/logging.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,23 @@ Start with the **Structlog** [guide](https://www.structlog.org/en/stable/logging
import sys
import structlog

shared_processors = [
shared_processors = (
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
]
)

if sys.stderr.isatty():
# terminal session
processors = shared_processors + [
structlog.dev.ConsoleRenderer()
processors = [
*shared_processors,
structlog.dev.ConsoleRenderer(),
]
else:
# Docker container session
processors = shared_processors + [
processors = [
*shared_processors,
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
Expand Down Expand Up @@ -193,7 +195,7 @@ def merge_contextvars(
) -> structlog.types.EventDict:
event_dict["extra"] = event_dict.get(
"extra",
context.get("log_context", {}),
context.get_local("log_context") or {},
)
return event_dict

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/en/getting-started/middlewares/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ class MyMiddleware(BaseMiddleware):
print(f"Received: {self.message}")
return await super().on_receive()

async def after_processed(self, exc_type, exc_val, exec_tb):
return await super().after_processed(exc_type, exc_val, exec_tb)
async def after_processed(self, exc_type, exc_val, exc_tb):
return await super().after_processed(exc_type, exc_val, exc_tb)
```

These methods should be overwritten only in a broker-level middlewares.
Expand Down
2 changes: 1 addition & 1 deletion docs/overrides/home.html
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ <h6>moosethemucha</h6>
</div>
<div class="text--center padding-horiz--md">
<p class="testimonialDescription_MWAM">This looks really cool - as someone who had to create a
distributed application using rabbitmq and aiopika - this would have saved me alot of time - I
distributed application using rabbitmq and aiopika - this would have saved me a lot of time - I
love the decorations for pub sub.</p>
</div>
</div>
Expand Down
4 changes: 2 additions & 2 deletions examples/e10_middlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exec_tb: Optional[TracebackType] = None,
exc_tb: Optional[TracebackType] = None,
) -> bool:
print("highlevel middleware out")
return await super().after_processed(exc_type, exc_val, exec_tb)
return await super().after_processed(exc_type, exc_val, exc_tb)


class HandlerMiddleware(BaseMiddleware):
Expand Down
2 changes: 1 addition & 1 deletion examples/fastapi_integration/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

@publisher
@router.subscriber("test-q")
async def handler(user_id: int, logger: Logger):
async def handler(user_id: int, logger: Logger) -> str:
logger.info(user_id)
return f"{user_id} created"

Expand Down
14 changes: 9 additions & 5 deletions examples/howto/structlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,27 @@ def merge_contextvars(
) -> structlog.types.EventDict:
event_dict["extra"] = event_dict.get(
"extra",
context.get("log_context", {}),
context.get_local("log_context") or {},
)
return event_dict


shared_processors = [
shared_processors = (
merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.StackInfoRenderer(),
structlog.dev.set_exc_info,
structlog.processors.TimeStamper(fmt="iso"),
]
)

if sys.stderr.isatty():
processors = shared_processors + [structlog.dev.ConsoleRenderer()]
processors = [
*shared_processors,
structlog.dev.ConsoleRenderer(),
]
else:
processors = shared_processors + [
processors = [
*shared_processors,
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
]
Expand Down
4 changes: 3 additions & 1 deletion faststream/__about__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
"""Simple and fast framework to create message brokers based microservices."""
__version__ = "0.4.3"
__version__ = "0.5.0"


SERVICE_NAME = f"faststream-{__version__}"

INSTALL_YAML = """
To generate YAML documentation, please install dependencies:\n
pip install PyYAML
Expand Down
19 changes: 7 additions & 12 deletions faststream/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ def raise_fastapi_validation_error(errors: List[Any], body: AnyDict) -> Never:

JsonSchemaValue = Mapping[str, Any]

PValidationError: Optional[Type[Exception]]
try:
from pydantic import ValidationError
PValidationError = ValidationError
except ImportError:
PValidationError = None

if PYDANTIC_V2:
if PYDANTIC_VERSION >= "2.4.0":
from pydantic.annotated_handlers import (
Expand Down Expand Up @@ -110,9 +117,6 @@ def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:
def model_to_json(model: BaseModel, **kwargs: Any) -> str:
return model.model_dump_json(**kwargs)

def model_to_dict(model: BaseModel, **kwargs: Any) -> AnyDict:
return model.model_dump(**kwargs)

def model_parse(
model: Type[ModelVar], data: Union[str, bytes], **kwargs: Any
) -> ModelVar:
Expand All @@ -121,9 +125,6 @@ def model_parse(
def model_schema(model: Type[BaseModel], **kwargs: Any) -> AnyDict:
return model.model_json_schema(**kwargs)

def model_copy(model: ModelVar, **kwargs: Any) -> ModelVar:
return model.model_copy(**kwargs)

else:
from pydantic.json import pydantic_encoder

Expand All @@ -141,9 +142,6 @@ def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:
def model_to_json(model: BaseModel, **kwargs: Any) -> str:
return model.json(**kwargs)

def model_to_dict(model: BaseModel, **kwargs: Any) -> AnyDict:
return model.dict(**kwargs)

def model_parse(
model: Type[ModelVar], data: Union[str, bytes], **kwargs: Any
) -> ModelVar:
Expand All @@ -158,9 +156,6 @@ def model_to_jsonable(
) -> Any:
return json_loads(model.json(**kwargs))

def model_copy(model: ModelVar, **kwargs: Any) -> ModelVar:
return model.copy(**kwargs)

# TODO: pydantic types misc
def with_info_plain_validator_function( # type: ignore[misc]
function: Callable[..., Any],
Expand Down
8 changes: 5 additions & 3 deletions faststream/annotations.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

from typing_extensions import Annotated

from faststream.utils.context import Context as ContextField
from faststream.app import FastStream as FS
from faststream.utils.context import Context
from faststream.utils.context import ContextRepo as CR
from faststream.utils.no_cast import NoCast as NC

_NoCastType = TypeVar("_NoCastType")

Logger = Annotated[logging.Logger, ContextField("logger")]
ContextRepo = Annotated[CR, ContextField("context")]
Logger = Annotated[logging.Logger, Context("logger")]
ContextRepo = Annotated[CR, Context("context")]
NoCast = Annotated[_NoCastType, NC()]
FastStream = Annotated[FS, Context("app")]
Loading