From 25571ad9df00aaadeb780cc65758777a729bb01e Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Thu, 28 Sep 2023 17:29:32 +0530 Subject: [PATCH 1/2] docker/api: use `redis` package Drop `aioredis` package as the project has been archived and it is now a part of `redis` package. Use `redis` directly. Signed-off-by: Jeny Sadadia --- docker/api/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/api/requirements.txt b/docker/api/requirements.txt index 5ff9a01b..ef0d243e 100644 --- a/docker/api/requirements.txt +++ b/docker/api/requirements.txt @@ -1,4 +1,3 @@ -aioredis[hiredis]==2.0.0 cloudevents==1.9.0 fastapi[all]==0.68.1 fastapi-pagination==0.9.3 @@ -10,3 +9,4 @@ motor==2.5.1 pymongo-migrate==0.11.0 pyyaml==5.3.1 fastapi-versioning==0.10.0 +redis==5.0.1 From 4d806a8ab1148470e75c137a495187cb63c966be Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Thu, 28 Sep 2023 17:30:42 +0530 Subject: [PATCH 2/2] api.pubsub: integrate `Redis List` Instead of broadcasting all the messages through Redis PubSub, use `Redis List` data structure to manage a queue of messages. `redis.lpush` is used to push message on a specific list in `publish` handler. `redis.blpop` is used for blocking pop operation in `listen` handler. Signed-off-by: Jeny Sadadia --- api/pubsub.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/api/pubsub.py b/api/pubsub.py index 7818e2f1..6445d168 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -7,9 +7,10 @@ import asyncio -import aioredis from cloudevents.http import CloudEvent, to_json from pydantic import BaseModel, BaseSettings, Field +import redis.asyncio as redis +import json class Settings(BaseSettings): @@ -54,7 +55,7 @@ def __init__(self, host=None, db_number=None): host = self._settings.redis_host if db_number is None: db_number = self._settings.redis_db_number - self._redis = aioredis.from_url(f'redis://{host}/{db_number}') + self._redis = redis.from_url(f'redis://{host}/{db_number}') self._subscriptions = {} self._channels = set() self._lock = asyncio.Lock() @@ -123,20 +124,27 @@ async def listen(self, sub_id): """ async with self._lock: sub = self._subscriptions[sub_id] + channel = list(sub.channels.keys())[0].decode() while True: - msg = await sub.get_message( - ignore_subscribe_messages=True, timeout=1.0 - ) - if msg is not None: - return msg + msg = await self._redis.blpop(channel, timeout=1.0) + data = json.loads(msg[1].decode('utf-8')) if msg else None + if data is not None: + return data + + # msg = await sub.get_message( + # ignore_subscribe_messages=True, timeout=1.0 + # ) + # if msg is not None: + # return msg async def publish(self, channel, message): """Publish a message on a channel Publish an arbitrary message asynchronously on a Pub/Sub channel. """ - await self._redis.publish(channel, message) + await self._redis.rpush(channel, message) + # await self._redis.publish(channel, message) async def publish_cloudevent(self, channel, data, attributes=None): """Publish a CloudEvent on a Pub/Sub channel