From 6b8a603151f5003a0ac55a3a69e0b961cca584dd Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Mon, 20 Nov 2023 16:56:24 +0530 Subject: [PATCH 1/4] docker/api: drop `aioredis` and use `redis` package instead Drop `aioredis` package as the project has been archived and it is now a part of `redis` package. Add `redis` to `requirementes.txt` file. To maintain compatibilities between packages, update `fakeredis` version in `requirements-tests.txt`. Signed-off-by: Jeny Sadadia --- api/pubsub.py | 2 +- docker/api/requirements-tests.txt | 2 +- docker/api/requirements.txt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/api/pubsub.py b/api/pubsub.py index 5ea221af..9c305fa4 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -7,7 +7,7 @@ import asyncio -import aioredis +from redis import asyncio as aioredis from cloudevents.http import CloudEvent, to_json from pydantic import BaseModel, Field from .config import PubSubSettings diff --git a/docker/api/requirements-tests.txt b/docker/api/requirements-tests.txt index 04390e81..0f0a16c3 100644 --- a/docker/api/requirements-tests.txt +++ b/docker/api/requirements-tests.txt @@ -1,5 +1,5 @@ -r requirements.txt -fakeredis==1.7.0 +fakeredis==2.20.0 pytest==6.2.5 pytest-asyncio==0.16.0 pytest-dependency==0.5.1 diff --git a/docker/api/requirements.txt b/docker/api/requirements.txt index b9694336..52106284 100644 --- a/docker/api/requirements.txt +++ b/docker/api/requirements.txt @@ -1,4 +1,3 @@ -aioredis[hiredis]==2.0.0 cloudevents==1.9.0 fastapi[all]==0.68.1 fastapi-pagination==0.9.3 @@ -11,4 +10,5 @@ pydantic==1.10.5 pymongo-migrate==0.11.0 python-jose[cryptography]==3.3.0 pyyaml==5.3.1 +redis==5.0.1 uvicorn[standard]==0.13.4 From 8016d93719845530ab66159379ee297df20f0029 Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Wed, 22 Nov 2023 11:21:38 +0530 Subject: [PATCH 2/4] api.pubsub: enable support for push and pop on Redis List Add methods to `PubSub` class to push messages to Redis List, pop message from the list, and push `CloudEvent` to the list. Signed-off-by: Jeny Sadadia --- api/pubsub.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/api/pubsub.py b/api/pubsub.py index 9c305fa4..9aa05ecd 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -7,6 +7,7 @@ import asyncio +import json from redis import asyncio as aioredis from cloudevents.http import CloudEvent, to_json from pydantic import BaseModel, Field @@ -131,6 +132,25 @@ async def publish(self, channel, message): """ await self._redis.publish(channel, message) + async def push(self, list_name, message): + """Push a message onto the tail of a list + + Push an arbitrary message asynchronously on a List. + """ + await self._redis.rpush(list_name, message) + + async def pop(self, list_name): + """Pop a message from a list + + Listen on a given list asynchronously and get a message + when received. Only a single consumer will receive the message. + """ + while True: + msg = await self._redis.blpop(list_name, timeout=1.0) + data = json.loads(msg[1].decode('utf-8')) if msg else None + if data is not None: + return data + async def publish_cloudevent(self, channel, data, attributes=None): """Publish a CloudEvent on a Pub/Sub channel @@ -147,3 +167,19 @@ async def publish_cloudevent(self, channel, data, attributes=None): } event = CloudEvent(attributes=attributes, data=data) await self.publish(channel, to_json(event)) + + async def push_cloudevent(self, list_name, data, attributes=None): + """Push a CloudEvent on a list + + Push a CloudEvent asynchronously on a given list using the + provided data and optional attributes. The data is the payload of the + message. The attributes are the type and source of event which will be + populated by default if not provided. + """ + if not attributes: + attributes = { + "type": "api.kernelci.org", + "source": self._settings.cloud_events_source, + } + event = CloudEvent(attributes=attributes, data=data) + await self.push(list_name, to_json(event)) From 143701ad9852a724553ef2376042c2a2e6c336ce Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Wed, 22 Nov 2023 11:24:18 +0530 Subject: [PATCH 3/4] api.main: add endpoints for Redis List push and pop In order to enable support for unicast messages, introduce endpoints `/push` and `/pop` for pushing a message to a Redis list and popping a message from the list respectively. This is to serve load-balancing purpose by sending message to a single consumer. Signed-off-by: Jeny Sadadia --- api/main.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/api/main.py b/api/main.py index 5533fcfd..04fc56a4 100644 --- a/api/main.py +++ b/api/main.py @@ -617,6 +617,21 @@ async def publish(raw: dict, channel: str, await pubsub.publish_cloudevent(channel, data, attributes) +@app.post('/push/{list_name}') +async def push(raw: dict, list_name: str, + user: User = Depends(get_current_user)): + """Push a message on the provided list""" + attributes = dict(raw) + data = attributes.pop('data') + await pubsub.push_cloudevent(list_name, data, attributes) + + +@app.get('/pop/{list_name}') +async def pop(list_name: str, user: User = Depends(get_current_user)): + """Pop a message from a given list""" + return await pubsub.pop(list_name) + + # ----------------------------------------------------------------------------- # Regression From 5c17c884f79b4f29c52edd9fa8c2532a24d2f16b Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Tue, 28 Nov 2023 12:59:55 +0530 Subject: [PATCH 4/4] unit_tests/test_pubsub: reset `PubSub.ID_KEY` value Somehow after changing version of `fakeredis`, redis mock stays the same for all the unit tests even if `mock_pubsub` fixture has the default scope `function`. That's why value of `PubSub._redis.ID_KEY` becomes global among all the tests and `pubsub.subscribe` increments it based on the value from the previous test run. Fix the issue by resetting `ID_KEY` value to get subscription ID starting from 1 in `test_subscribe_multiple_channels`. Signed-off-by: Jeny Sadadia --- tests/unit_tests/test_pubsub.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit_tests/test_pubsub.py b/tests/unit_tests/test_pubsub.py index 3a7e6a05..90ce03e5 100644 --- a/tests/unit_tests/test_pubsub.py +++ b/tests/unit_tests/test_pubsub.py @@ -44,6 +44,8 @@ async def test_subscribe_multiple_channels(mock_pubsub): PubSub._subscriptions dict should have 2 entries. This entries' keys should be 1, 2, and 3. """ + # Reset `ID_KEY` value to get subscription ID starting from 1 + await mock_pubsub._redis.set(mock_pubsub.ID_KEY, 0) channels = ((1, 'CHANNEL1'), (2, 'CHANNEL2'), (3, 'CHANNEL3')) for expected_id, expected_channel in channels: result = await mock_pubsub.subscribe(expected_channel)