Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable support for unicast messages #417

Merged
merged 4 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,21 @@ async def publish(raw: dict, channel: str,
await pubsub.publish_cloudevent(channel, data, attributes)


@app.post('/push/{list_name}')
async def push(raw: dict, list_name: str,
user: User = Depends(get_current_user)):
"""Push a message on the provided list"""
attributes = dict(raw)
data = attributes.pop('data')
await pubsub.push_cloudevent(list_name, data, attributes)


@app.get('/pop/{list_name}')
async def pop(list_name: str, user: User = Depends(get_current_user)):
"""Pop a message from a given list"""
return await pubsub.pop(list_name)


# -----------------------------------------------------------------------------
# Regression

Expand Down
38 changes: 37 additions & 1 deletion api/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@

import asyncio

import aioredis
import json
from redis import asyncio as aioredis
from cloudevents.http import CloudEvent, to_json
from pydantic import BaseModel, Field
from .config import PubSubSettings
Expand Down Expand Up @@ -131,6 +132,25 @@ async def publish(self, channel, message):
"""
await self._redis.publish(channel, message)

async def push(self, list_name, message):
"""Push a message onto the tail of a list

Push an arbitrary message asynchronously on a List.
"""
await self._redis.rpush(list_name, message)

async def pop(self, list_name):
"""Pop a message from a list

Listen on a given list asynchronously and get a message
when received. Only a single consumer will receive the message.
"""
while True:
msg = await self._redis.blpop(list_name, timeout=1.0)
data = json.loads(msg[1].decode('utf-8')) if msg else None
if data is not None:
return data

async def publish_cloudevent(self, channel, data, attributes=None):
"""Publish a CloudEvent on a Pub/Sub channel

Expand All @@ -147,3 +167,19 @@ async def publish_cloudevent(self, channel, data, attributes=None):
}
event = CloudEvent(attributes=attributes, data=data)
await self.publish(channel, to_json(event))

async def push_cloudevent(self, list_name, data, attributes=None):
"""Push a CloudEvent on a list

Push a CloudEvent asynchronously on a given list using the
provided data and optional attributes. The data is the payload of the
message. The attributes are the type and source of event which will be
populated by default if not provided.
"""
if not attributes:
attributes = {
"type": "api.kernelci.org",
"source": self._settings.cloud_events_source,
}
event = CloudEvent(attributes=attributes, data=data)
await self.push(list_name, to_json(event))
2 changes: 1 addition & 1 deletion docker/api/requirements-tests.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-r requirements.txt
fakeredis==1.7.0
fakeredis==2.20.0
pytest==6.2.5
pytest-asyncio==0.16.0
pytest-dependency==0.5.1
Expand Down
2 changes: 1 addition & 1 deletion docker/api/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
aioredis[hiredis]==2.0.0
cloudevents==1.9.0
fastapi[all]==0.68.1
fastapi-pagination==0.9.3
Expand All @@ -11,4 +10,5 @@ pydantic==1.10.5
pymongo-migrate==0.11.0
python-jose[cryptography]==3.3.0
pyyaml==5.3.1
redis==5.0.1
uvicorn[standard]==0.13.4
2 changes: 2 additions & 0 deletions tests/unit_tests/test_pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
PubSub._subscriptions dict should have 2 entries. This entries'
keys should be 1, 2, and 3.
"""
# Reset `ID_KEY` value to get subscription ID starting from 1
await mock_pubsub._redis.set(mock_pubsub.ID_KEY, 0)
channels = ((1, 'CHANNEL1'), (2, 'CHANNEL2'), (3, 'CHANNEL3'))
for expected_id, expected_channel in channels:
result = await mock_pubsub.subscribe(expected_channel)
Expand Down Expand Up @@ -91,20 +93,20 @@
published in the channel by the redis publisher.

Expected Results:
Validate that a json is sent to the channel and assert the json values from

Check warning on line 96 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

line too long (83 > 79 characters)
data and attributes parameters in Pubsub.publish_cloudevent(). There's no

Check warning on line 97 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

line too long (81 > 79 characters)
return value, but a json to be published in a channel.
"""

data = 'validate json'
attributes = { "specversion": "1.0", "id": "6878b661-96dc-4e93-8c92-26eb9ff8db64",

Check warning on line 102 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

whitespace after '{'

Check warning on line 102 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

line too long (87 > 79 characters)
"source": "https://api.kernelci.org/", "type": "api.kernelci.org",

Check warning on line 103 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

continuation line under-indented for visual indent
"time": "2022-01-31T21:29:29.675593+00:00"}

Check warning on line 104 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

continuation line under-indented for visual indent

await mock_pubsub_publish.publish_cloudevent('CHANNEL1', data, attributes)

expected_json = str.encode('{"specversion": "1.0", '\

Check warning on line 108 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

the backslash is redundant between brackets
'"id": "6878b661-96dc-4e93-8c92-26eb9ff8db64", "source": "https://api.kernelci.org/", '\

Check warning on line 109 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

continuation line under-indented for visual indent

Check warning on line 109 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

line too long (92 > 79 characters)

Check warning on line 109 in tests/unit_tests/test_pubsub.py

View workflow job for this annotation

GitHub Actions / Lint

the backslash is redundant between brackets
'"type": "api.kernelci.org", "time": "2022-01-31T21:29:29.675593+00:00", '\
'"data": "validate json"}')

Expand Down
Loading