diff --git a/README.md b/README.md index eb2d805..df620fb 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,14 @@ Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redi - Multiple consumers in one consumer group - No scheduler needed, consumer handles itself +## Configuration + +If using `BrqConfig`, you can use a `.env` file and environment variables to configure brq. The prefix of environment variables is `BRQ_`. + +> For example, `BRQ_REDIS_PORT=6379 python consumer.py` for specifying redis port. + +See [configs](./brq/configs.py) for more details. + ## Echo job overview ### Producer @@ -40,21 +48,18 @@ Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redi import os from brq.producer import Producer -from brq.tools import get_redis_client, get_redis_url +from brq.configs import BrqConfig async def main(): - redis_url = get_redis_url( - host=os.getenv("REDIS_HOST", "localhost"), - port=int(os.getenv("REDIS_PORT", 6379)), - db=int(os.getenv("REDIS_DB", 0)), - cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]), - tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]), - username=os.getenv("REDIS_USERNAME", ""), - password=os.getenv("REDIS_PASSWORD", ""), - ) - async with get_redis_client(redis_url) as async_redis_client: - await Producer(async_redis_client).run_job("echo", ["hello"]) + config = BrqConfig() + async with config.open_redis_client() as async_redis_client: + await Producer( + async_redis_client, + redis_prefix=config.redis_key_prefix, + redis_seperator=config.redis_key_seperator, + max_message_len=config.producer_max_message_length, + ).run_job("echo", ["hello"]) if __name__ == "__main__": @@ -65,6 +70,27 @@ if __name__ == "__main__": ### Consumer +The only thing you need is `@task`, and the target function can be `sync` or `async` and `sync` function will be converted to `async` function and run in a thread automatically. + +```python +from brq import task + + +@task +def echo(message): + print(f"Received message: {message}") + + +if __name__ == "__main__": + # Run the task once, for local debug + # echo("hello") + + # Run as a daemon + echo.serve() +``` + +This is the same as the following, the classic way...But more flexible. + ```python import os diff --git a/brq/__init__.py b/brq/__init__.py index be0f5e5..d1b610a 100644 --- a/brq/__init__.py +++ b/brq/__init__.py @@ -3,3 +3,19 @@ __author__ = "wh1isper" __email__ = "9573586@qq.com" __version__ = "0.3.9.dev0" + + +from .configs import BrqConfig +from .consumer import Consumer +from .decorator import task +from .producer import Producer +from .tools import get_redis_client, get_redis_url + +__all__ = [ + "task", + "Consumer", + "Producer", + "get_redis_client", + "get_redis_url", + "BrqConfig", +] diff --git a/brq/configs.py b/brq/configs.py new file mode 100644 index 0000000..63d4f41 --- /dev/null +++ b/brq/configs.py @@ -0,0 +1,73 @@ +import os +import uuid +from contextlib import asynccontextmanager +from typing import Any, AsyncGenerator + +import redis.asyncio as redis +from pydantic import BaseModel +from pydantic_settings import BaseSettings, SettingsConfigDict + +from brq.tools import get_redis_client, get_redis_url + + +class Settings(BaseSettings): + model_config = SettingsConfigDict( + env_prefix="brq_", + env_file=".env", + env_nested_delimiter="__", + env_file_encoding="utf-8", + case_sensitive=False, + ) + + +class RedisSettingsMixin: + redis_host: str = "localhost" + redis_port: int = 6379 + redis_db: int = 0 + redis_cluster: bool = False + redis_tls: bool = False + redis_username: str = "" + redis_password: str = "" + + @property + def redis_url(self) -> str: + return get_redis_url( + host=self.redis_host, + port=self.redis_port, + db=self.redis_db, + cluster=self.redis_cluster, + tls=self.redis_tls, + username=self.redis_username, + password=self.redis_password, + ) + + @asynccontextmanager + async def open_redis_client( + self, + ) -> AsyncGenerator[Any, redis.Redis | redis.RedisCluster]: + async with get_redis_client(self.redis_url) as redis_client: + yield redis_client + + +class BrqConfig(Settings, RedisSettingsMixin): + redis_key_prefix: str = "brq" + redis_key_seperator: str = ":" + + producer_max_message_length: int = 1000 + + consumer_group_name: str = "default-workers" + consumer_identifier: str = uuid.uuid4().hex + consumer_count_per_fetch: int = 1 + consumer_block_time: int = 1 + consumer_expire_time: int = 60 * 60 + consumer_process_timeout: int = 60 + consumer_retry_lock_time: int = 300 + consumer_retry_cooldown_time: int = 60 + consumer_enable_enque_deferred_job: bool = True + consumer_enable_reprocess_timeout_job: bool = True + consumer_enable_dead_queue: bool = True + consumer_max_message_len: int = 1000 + consumer_delete_message_after_process: bool = False + consumer_run_parallel: bool = False + + daemon_concurrency: int = 1 diff --git a/brq/decorator.py b/brq/decorator.py index e69de29..334ee14 100644 --- a/brq/decorator.py +++ b/brq/decorator.py @@ -0,0 +1,89 @@ +import asyncio +from functools import partial +from typing import Awaitable, Callable + +from anyio import CapacityLimiter + +from brq.configs import BrqConfig +from brq.consumer import Consumer +from brq.daemon import Daemon +from brq.log import logger +from brq.tools import ensure_awaitable + + +class BrqTaskWrapper: + def __init__( + self, + func: Callable | Awaitable, + config: BrqConfig, + register_function_name: str | None = None, + ): + self._func = func + self.config = config + self.register_function_name = register_function_name or func.__name__ + + def __call__(self, *args, **kwargs): + return self._func(*args, **kwargs) + + def serve(self): + asyncio.run(self._serve()) + + async def _serve(self): + async with self.config.open_redis_client() as async_redis_client: + awaitable_function = ensure_awaitable( + self._func, + limiter=CapacityLimiter(total_tokens=self.config.daemon_concurrency), + ) + consumer_builder = partial( + Consumer, + redis_prefix=self.config.redis_key_prefix, + redis_seperator=self.config.redis_key_seperator, + redis=async_redis_client, + awaitable_function=awaitable_function, + register_function_name=self.register_function_name, + group_name=self.config.consumer_group_name, + consumer_identifier=self.config.consumer_identifier, + count_per_fetch=self.config.consumer_count_per_fetch, + block_time=self.config.consumer_block_time, + expire_time=self.config.consumer_expire_time, + process_timeout=self.config.consumer_process_timeout, + retry_lock_time=self.config.consumer_retry_lock_time, + retry_cooldown_time=self.config.consumer_retry_cooldown_time, + enable_enque_deferred_job=self.config.consumer_enable_enque_deferred_job, + enable_reprocess_timeout_job=self.config.consumer_enable_reprocess_timeout_job, + enable_dead_queue=self.config.consumer_enable_dead_queue, + max_message_len=self.config.consumer_max_message_len, + delete_message_after_process=self.config.consumer_delete_message_after_process, + run_parallel=self.config.consumer_run_parallel, + ) + daemon = Daemon(*[consumer_builder() for _ in range(self.config.daemon_concurrency)]) + await daemon.run_forever() + + +def task( + _func=None, + *, + config: BrqConfig | None = None, + register_function_name: str | None = None, +): + if not config: + logger.info("Initializing config from environment variables and .env file") + config = BrqConfig() + else: + logger.info("Using custom config") + config = config + + def _wrapper(func, config, register_function_name): + if not config: + logger.info("Initializing config from environment variables and .env file") + config = BrqConfig() + else: + logger.info("Using custom config") + config = config + + return BrqTaskWrapper(func, config, register_function_name) + + if _func is None: + return _wrapper + else: + return _wrapper(_func, config=config, register_function_name=register_function_name) diff --git a/brq/tools.py b/brq/tools.py index 6b74baf..6d66052 100644 --- a/brq/tools.py +++ b/brq/tools.py @@ -1,10 +1,14 @@ import asyncio import contextlib +import functools +import inspect from contextlib import asynccontextmanager from datetime import datetime from typing import Any, AsyncGenerator +import anyio import redis.asyncio as redis +from anyio import to_thread from brq.log import logger @@ -75,3 +79,17 @@ async def get_redis_client( yield redis_client finally: await redis_client.aclose() + + +def ensure_awaitable(func, limiter=None): + if inspect.iscoroutinefunction(func): + return func + + @functools.wraps(func) + async def wrapper(*args, **kwargs): + nonlocal func + if kwargs: + func = functools.partial(func, **kwargs) + return await anyio.to_thread.run_sync(func, *args, limiter=limiter) + + return wrapper diff --git a/examples/decorator/README.md b/examples/decorator/README.md new file mode 100644 index 0000000..6f783df --- /dev/null +++ b/examples/decorator/README.md @@ -0,0 +1,5 @@ +## Echo example + +Use [start-redis.sh](../../dev/start-redis.sh) to start redis in docker. + +And run `python producer.py` and `python consumer.py` in different terminals at the same time. diff --git a/examples/decorator/consumer.py b/examples/decorator/consumer.py new file mode 100644 index 0000000..9a6c246 --- /dev/null +++ b/examples/decorator/consumer.py @@ -0,0 +1,14 @@ +from brq import task + + +@task +def echo(message): + print(f"Received message: {message}") + + +if __name__ == "__main__": + # Run the task once, for local debug + # echo("hello") + + # Run as a daemon + echo.serve() diff --git a/examples/decorator/producer.py b/examples/decorator/producer.py new file mode 100644 index 0000000..028dfdf --- /dev/null +++ b/examples/decorator/producer.py @@ -0,0 +1,21 @@ +import os + +from brq.configs import BrqConfig +from brq.producer import Producer + + +async def main(): + config = BrqConfig() + async with config.open_redis_client() as async_redis_client: + await Producer( + async_redis_client, + redis_prefix=config.redis_key_prefix, + redis_seperator=config.redis_key_seperator, + max_message_len=config.producer_max_message_length, + ).run_job("echo", ["hello"]) + + +if __name__ == "__main__": + import asyncio + + asyncio.run(main()) diff --git a/examples/echo/README.md b/examples/echo/README.md index 4c7651d..6f783df 100644 --- a/examples/echo/README.md +++ b/examples/echo/README.md @@ -1,3 +1,5 @@ ## Echo example -Just run `python producer.py` and `python consumer.py` in different terminals at the same time. +Use [start-redis.sh](../../dev/start-redis.sh) to start redis in docker. + +And run `python producer.py` and `python consumer.py` in different terminals at the same time. diff --git a/pyproject.toml b/pyproject.toml index 62ae1b6..f43ac0e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,7 +7,14 @@ name = "brq" description = "brq" keywords = ["brq"] requires-python = ">=3.10" -dependencies = ["pydantic>=2, <3", "redis>=5.0, <6", "loguru", "click"] +dependencies = [ + "pydantic>=2, <3", + "redis>=5.0, <6", + "loguru", + "click", + "pydantic-settings", + "anyio", +] dynamic = ["version"] classifiers = [ "Programming Language :: Python :: 3",