Python CQRS pattern implementation.
pip install meator
- Dispatchers:
- Command
- Query
- Observers:
- Event
- Entities:
- Command
- Event
- Query
- Middlewares
from dataclasses import dataclass
from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command
from meator.interfaces import CommandHandler
@dataclass
class IntCommand(Command[int]):
answer: int
class IntCommandHandler(CommandHandler[IntCommand, int]):
async def __call__(self, request: IntCommand) -> int:
return request.answer
async def main():
c = CommandDispatcherImpl()
c.register(IntCommand, IntCommandHandler())
await c.handle(IntCommand(1))
from dataclasses import dataclass
from meator.dispatchers import CommandDispatcherImpl
from meator.entities import Command, Request
from meator.interfaces import CommandHandler, Handler, Middleware
class SimpleMiddleware(Middleware):
async def __call__(self, call_next: Handler, request: Request):
return await call_next(request)
@dataclass
class IntCommand(Command[int]):
answer: int
class IntCommandHandler(CommandHandler[IntCommand, int]):
async def __call__(self, request: IntCommand) -> int:
return request.answer
async def main():
c = CommandDispatcherImpl(middlewares=[SimpleMiddleware()])
c.register(IntCommand, IntCommandHandler())
await c.handle(IntCommand(1))
pytest tests
Inspired by didator