Skip to content

Commit

Permalink
Imp decorator and change README
Browse files Browse the repository at this point in the history
  • Loading branch information
Wh1isper committed Sep 4, 2024
1 parent 550d1b3 commit 9449991
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 14 deletions.
50 changes: 38 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redi
- Multiple consumers in one consumer group
- No scheduler needed, consumer handles itself

## Configuration

If using `BrqConfig`, you can use a `.env` file and environment variables to configure brq. The prefix of environment variables is `BRQ_`.

> For example, `BRQ_REDIS_PORT=6379 python consumer.py` for specifying redis port.
See [configs](./brq/configs.py) for more details.

## Echo job overview

### Producer
Expand All @@ -40,21 +48,18 @@ Redis >= 6.2, tested with latest redis 6/7 docker image. Recommended to use redi
import os

from brq.producer import Producer
from brq.tools import get_redis_client, get_redis_url
from brq.configs import BrqConfig


async def main():
redis_url = get_redis_url(
host=os.getenv("REDIS_HOST", "localhost"),
port=int(os.getenv("REDIS_PORT", 6379)),
db=int(os.getenv("REDIS_DB", 0)),
cluster=bool(os.getenv("REDIS_CLUSTER", "false") in ["True", "true", "1"]),
tls=bool(os.getenv("REDIS_TLS", "false") in ["True", "true", "1"]),
username=os.getenv("REDIS_USERNAME", ""),
password=os.getenv("REDIS_PASSWORD", ""),
)
async with get_redis_client(redis_url) as async_redis_client:
await Producer(async_redis_client).run_job("echo", ["hello"])
config = BrqConfig()
async with config.open_redis_client() as async_redis_client:
await Producer(
async_redis_client,
redis_prefix=config.redis_key_prefix,
redis_seperator=config.redis_key_seperator,
max_message_len=config.producer_max_message_length,
).run_job("echo", ["hello"])


if __name__ == "__main__":
Expand All @@ -65,6 +70,27 @@ if __name__ == "__main__":

### Consumer

The only thing you need is `@task`, and the target function can be `sync` or `async` and `sync` function will be converted to `async` function and run in a thread automatically.

```python
from brq import task


@task
def echo(message):
print(f"Received message: {message}")


if __name__ == "__main__":
# Run the task once, for local debug
# echo("hello")

# Run as a daemon
echo.serve()
```

This is the same as the following, the classic way...But more flexible.

```python
import os

Expand Down
16 changes: 16 additions & 0 deletions brq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,19 @@
__author__ = "wh1isper"
__email__ = "[email protected]"
__version__ = "0.3.9.dev0"


from .configs import BrqConfig
from .consumer import Consumer
from .decorator import task
from .producer import Producer
from .tools import get_redis_client, get_redis_url

__all__ = [
"task",
"Consumer",
"Producer",
"get_redis_client",
"get_redis_url",
"BrqConfig",
]
73 changes: 73 additions & 0 deletions brq/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import os
import uuid
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator

import redis.asyncio as redis
from pydantic import BaseModel
from pydantic_settings import BaseSettings, SettingsConfigDict

from brq.tools import get_redis_client, get_redis_url


class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_prefix="brq_",
env_file=".env",
env_nested_delimiter="__",
env_file_encoding="utf-8",
case_sensitive=False,
)


class RedisSettingsMixin:
redis_host: str = "localhost"
redis_port: int = 6379
redis_db: int = 0
redis_cluster: bool = False
redis_tls: bool = False
redis_username: str = ""
redis_password: str = ""

@property
def redis_url(self) -> str:
return get_redis_url(

Check warning on line 34 in brq/configs.py

View check run for this annotation

Codecov / codecov/patch

brq/configs.py#L34

Added line #L34 was not covered by tests
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
cluster=self.redis_cluster,
tls=self.redis_tls,
username=self.redis_username,
password=self.redis_password,
)

@asynccontextmanager
async def open_redis_client(
self,
) -> AsyncGenerator[Any, redis.Redis | redis.RedisCluster]:
async with get_redis_client(self.redis_url) as redis_client:
yield redis_client

Check warning on line 49 in brq/configs.py

View check run for this annotation

Codecov / codecov/patch

brq/configs.py#L48-L49

Added lines #L48 - L49 were not covered by tests


class BrqConfig(Settings, RedisSettingsMixin):
redis_key_prefix: str = "brq"
redis_key_seperator: str = ":"

producer_max_message_length: int = 1000

consumer_group_name: str = "default-workers"
consumer_identifier: str = uuid.uuid4().hex
consumer_count_per_fetch: int = 1
consumer_block_time: int = 1
consumer_expire_time: int = 60 * 60
consumer_process_timeout: int = 60
consumer_retry_lock_time: int = 300
consumer_retry_cooldown_time: int = 60
consumer_enable_enque_deferred_job: bool = True
consumer_enable_reprocess_timeout_job: bool = True
consumer_enable_dead_queue: bool = True
consumer_max_message_len: int = 1000
consumer_delete_message_after_process: bool = False
consumer_run_parallel: bool = False

daemon_concurrency: int = 1
89 changes: 89 additions & 0 deletions brq/decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import asyncio
from functools import partial
from typing import Awaitable, Callable

from anyio import CapacityLimiter

from brq.configs import BrqConfig
from brq.consumer import Consumer
from brq.daemon import Daemon
from brq.log import logger
from brq.tools import ensure_awaitable


class BrqTaskWrapper:
def __init__(
self,
func: Callable | Awaitable,
config: BrqConfig,
register_function_name: str | None = None,
):
self._func = func
self.config = config
self.register_function_name = register_function_name or func.__name__

Check warning on line 23 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L21-L23

Added lines #L21 - L23 were not covered by tests

def __call__(self, *args, **kwargs):
return self._func(*args, **kwargs)

Check warning on line 26 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L26

Added line #L26 was not covered by tests

def serve(self):
asyncio.run(self._serve())

Check warning on line 29 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L29

Added line #L29 was not covered by tests

async def _serve(self):
async with self.config.open_redis_client() as async_redis_client:
awaitable_function = ensure_awaitable(

Check warning on line 33 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L32-L33

Added lines #L32 - L33 were not covered by tests
self._func,
limiter=CapacityLimiter(total_tokens=self.config.daemon_concurrency),
)
consumer_builder = partial(

Check warning on line 37 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L37

Added line #L37 was not covered by tests
Consumer,
redis_prefix=self.config.redis_key_prefix,
redis_seperator=self.config.redis_key_seperator,
redis=async_redis_client,
awaitable_function=awaitable_function,
register_function_name=self.register_function_name,
group_name=self.config.consumer_group_name,
consumer_identifier=self.config.consumer_identifier,
count_per_fetch=self.config.consumer_count_per_fetch,
block_time=self.config.consumer_block_time,
expire_time=self.config.consumer_expire_time,
process_timeout=self.config.consumer_process_timeout,
retry_lock_time=self.config.consumer_retry_lock_time,
retry_cooldown_time=self.config.consumer_retry_cooldown_time,
enable_enque_deferred_job=self.config.consumer_enable_enque_deferred_job,
enable_reprocess_timeout_job=self.config.consumer_enable_reprocess_timeout_job,
enable_dead_queue=self.config.consumer_enable_dead_queue,
max_message_len=self.config.consumer_max_message_len,
delete_message_after_process=self.config.consumer_delete_message_after_process,
run_parallel=self.config.consumer_run_parallel,
)
daemon = Daemon(*[consumer_builder() for _ in range(self.config.daemon_concurrency)])
await daemon.run_forever()

Check warning on line 60 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L59-L60

Added lines #L59 - L60 were not covered by tests


def task(
_func=None,
*,
config: BrqConfig | None = None,
register_function_name: str | None = None,
):
if not config:
logger.info("Initializing config from environment variables and .env file")
config = BrqConfig()

Check warning on line 71 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L69-L71

Added lines #L69 - L71 were not covered by tests
else:
logger.info("Using custom config")
config = config

Check warning on line 74 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L73-L74

Added lines #L73 - L74 were not covered by tests

def _wrapper(func, config, register_function_name):
if not config:
logger.info("Initializing config from environment variables and .env file")
config = BrqConfig()

Check warning on line 79 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L76-L79

Added lines #L76 - L79 were not covered by tests
else:
logger.info("Using custom config")
config = config

Check warning on line 82 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L81-L82

Added lines #L81 - L82 were not covered by tests

return BrqTaskWrapper(func, config, register_function_name)

Check warning on line 84 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L84

Added line #L84 was not covered by tests

if _func is None:
return _wrapper

Check warning on line 87 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L86-L87

Added lines #L86 - L87 were not covered by tests
else:
return _wrapper(_func, config=config, register_function_name=register_function_name)

Check warning on line 89 in brq/decorator.py

View check run for this annotation

Codecov / codecov/patch

brq/decorator.py#L89

Added line #L89 was not covered by tests
18 changes: 18 additions & 0 deletions brq/tools.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import asyncio
import contextlib
import functools
import inspect
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Any, AsyncGenerator

import anyio
import redis.asyncio as redis
from anyio import to_thread

from brq.log import logger

Expand Down Expand Up @@ -75,3 +79,17 @@ async def get_redis_client(
yield redis_client
finally:
await redis_client.aclose()


def ensure_awaitable(func, limiter=None):
if inspect.iscoroutinefunction(func):
return func

@functools.wraps(func)
async def wrapper(*args, **kwargs):
nonlocal func
if kwargs:
func = functools.partial(func, **kwargs)
return await anyio.to_thread.run_sync(func, *args, limiter=limiter)

return wrapper
5 changes: 5 additions & 0 deletions examples/decorator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
## Echo example

Use [start-redis.sh](../../dev/start-redis.sh) to start redis in docker.

And run `python producer.py` and `python consumer.py` in different terminals at the same time.
14 changes: 14 additions & 0 deletions examples/decorator/consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from brq import task


@task
def echo(message):
print(f"Received message: {message}")


if __name__ == "__main__":
# Run the task once, for local debug
# echo("hello")

# Run as a daemon
echo.serve()
21 changes: 21 additions & 0 deletions examples/decorator/producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import os

from brq.configs import BrqConfig
from brq.producer import Producer


async def main():
config = BrqConfig()
async with config.open_redis_client() as async_redis_client:
await Producer(
async_redis_client,
redis_prefix=config.redis_key_prefix,
redis_seperator=config.redis_key_seperator,
max_message_len=config.producer_max_message_length,
).run_job("echo", ["hello"])


if __name__ == "__main__":
import asyncio

asyncio.run(main())
4 changes: 3 additions & 1 deletion examples/echo/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## Echo example

Just run `python producer.py` and `python consumer.py` in different terminals at the same time.
Use [start-redis.sh](../../dev/start-redis.sh) to start redis in docker.

And run `python producer.py` and `python consumer.py` in different terminals at the same time.
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@ name = "brq"
description = "brq"
keywords = ["brq"]
requires-python = ">=3.10"
dependencies = ["pydantic>=2, <3", "redis>=5.0, <6", "loguru", "click"]
dependencies = [
"pydantic>=2, <3",
"redis>=5.0, <6",
"loguru",
"click",
"pydantic-settings",
"anyio",
]
dynamic = ["version"]
classifiers = [
"Programming Language :: Python :: 3",
Expand Down

0 comments on commit 9449991

Please sign in to comment.