From 146c35cad23badd44192cee4942f412d05880490 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Thu, 30 Nov 2023 17:16:03 +0100 Subject: [PATCH 01/11] api.models: refactor PublishEvent model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Simplify the PublishEvent model and define it so that only the "data" field is mandatory. "type" and "source" are optional and the API fills them by default if undefined. "attributes" is an optional dict to specify additional event attributes. An additional attribute ("owner") will be automatically added by the API when necessary. Signed-off-by: Ricardo Cañuelo --- api/models.py | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/api/models.py b/api/models.py index b68573b1..8d9f3bec 100644 --- a/api/models.py +++ b/api/models.py @@ -369,22 +369,17 @@ def get_model_from_kind(kind: str): return models[kind] -class PublishAttributes(BaseModel): - """API model for the attributes of a Publish operation""" - type: str = Field( - default='api.kernelci.org', +class PublishEvent(BaseModel): + """API model for the data of a event""" + data: Any = Field( + description="Event payload" + ) + type: Optional[str] = Field( description="Type of the event" ) - source: str = Field( + source: Optional[str] = Field( description="Source of the event" ) - - -class PublishEvent(BaseModel): - """API model for the data of a event""" - attributes: Optional[PublishAttributes] = Field( - description="Event attributes" - ) - data: Dict = Field( - description="Event payload" + attributes: Optional[Dict] = Field( + description="Extra Cloudevents Extension Context Attributes" ) From e75ce0bee3e8a1d82a5986521f7193ccfafa3f24 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Thu, 30 Nov 2023 17:21:11 +0100 Subject: [PATCH 02/11] api.pubsub: add 'user' field to the Subscription model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This field will associate a Subscription to the user that requested it and will be used later to do server-side filtering of events based on the event owner. Signed-off-by: Ricardo Cañuelo --- api/pubsub.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/pubsub.py b/api/pubsub.py index 9aa05ecd..9fc654fc 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -22,6 +22,10 @@ class Subscription(BaseModel): channel: str = Field( description='Subscription channel name' ) + user: str = Field( + description=("Username of the user that created the " + "subscription (owner)") + ) class PubSub: From 1d3cd7f99fbc60121e0ca71da30e391cbde01de9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Thu, 30 Nov 2023 17:28:08 +0100 Subject: [PATCH 03/11] api.pubsub: rework subscription handling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Keep track of the Subscription object associated to each subscription id so that it can be retrieved later on "listen" operations. This will allow the API to check for the Subscription "user" and filter out unwanted messages if necessary. Signed-off-by: Ricardo Cañuelo --- api/pubsub.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/api/pubsub.py b/api/pubsub.py index 9fc654fc..521cf903 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -53,6 +53,12 @@ def __init__(self, host=None, db_number=None): if db_number is None: db_number = self._settings.redis_db_number self._redis = aioredis.from_url(f'redis://{host}/{db_number}') + # self._subscriptions is a dict that matches a subscription id + # (key) with a Subscription object ('sub') and a redis + # PubSub object ('redis_sub'). For instance: + # {1 : {'sub': , 'redis_sub': }} + # + # Note that this matching is kept in this dict only. self._subscriptions = {} self._channels = set() self._lock = asyncio.Lock() @@ -82,24 +88,24 @@ async def _keep_alive(self): def _update_channels(self): self._channels = set() for sub in self._subscriptions.values(): - for channel in sub.channels.keys(): + for channel in sub['redis_sub'].channels.keys(): self._channels.add(channel.decode()) - async def subscribe(self, channel): + async def subscribe(self, channel, user): """Subscribe to a Pub/Sub channel - Subscribe to a given channel and return a Subscription object - containing the subscription id which can then be used again in other - methods. + Subscribe to a given channel and return a Subscription object. """ sub_id = await self._redis.incr(self.ID_KEY) async with self._lock: - sub = self._redis.pubsub() - self._subscriptions[sub_id] = sub - await sub.subscribe(channel) + redis_sub = self._redis.pubsub() + sub = Subscription(id=sub_id, channel=channel, user=user) + self._subscriptions[sub_id] = {'redis_sub': redis_sub, + 'sub': sub} + await redis_sub.subscribe(channel) self._update_channels() self._start_keep_alive_timer() - return Subscription(id=sub_id, channel=channel) + return sub async def unsubscribe(self, sub_id): """Unsubscribe from a Pub/Sub channel @@ -108,10 +114,9 @@ async def unsubscribe(self, sub_id): in a Subscription object. """ async with self._lock: - sub = self._subscriptions[sub_id] - self._subscriptions.pop(sub_id) + sub = self._subscriptions.pop(sub_id) self._update_channels() - await sub.unsubscribe() + await sub['redis_sub'].unsubscribe() async def listen(self, sub_id): """Listen for Pub/Sub messages @@ -123,7 +128,7 @@ async def listen(self, sub_id): sub = self._subscriptions[sub_id] while True: - msg = await sub.get_message( + msg = await sub['redis_sub'].get_message( ignore_subscribe_messages=True, timeout=1.0 ) if msg is not None: From 1d5a27fe332e9d925b8b31a074543b799fd0bdc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Thu, 30 Nov 2023 17:31:27 +0100 Subject: [PATCH 04/11] api.pubsub: rework attribute processing of events MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let the caller specify optional attributes or no attributes at all. Fill in 'type' and 'source' with default values if undefined. Signed-off-by: Ricardo Cañuelo --- api/pubsub.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/api/pubsub.py b/api/pubsub.py index 521cf903..60464281 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -170,10 +170,11 @@ async def publish_cloudevent(self, channel, data, attributes=None): for more details. """ if not attributes: - attributes = { - "type": "api.kernelci.org", - "source": self._settings.cloud_events_source, - } + attributes = {} + if not attributes.get('type'): + attributes['type'] = "api.kernelci.org" + if not attributes.get('source'): + attributes['source'] = self._settings.cloud_events_source event = CloudEvent(attributes=attributes, data=data) await self.publish(channel, to_json(event)) From bafbff0987612ae019e9ad89094a38bd3bd8741b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Thu, 30 Nov 2023 17:34:01 +0100 Subject: [PATCH 05/11] api.pubsub: server-side events filtering based on ownership MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit If the event specifies an owner, send it only to the subscriptions created by that user. Signed-off-by: Ricardo Cañuelo --- api/pubsub.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/api/pubsub.py b/api/pubsub.py index 60464281..be7345c1 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -131,8 +131,12 @@ async def listen(self, sub_id): msg = await sub['redis_sub'].get_message( ignore_subscribe_messages=True, timeout=1.0 ) - if msg is not None: - return msg + if msg is None: + continue + msg_data = json.loads(msg['data']) + if 'owner' in msg_data and msg_data['owner'] != sub['sub'].user: + continue + return msg async def publish(self, channel, message): """Publish a message on a channel From 8a2b2457a68298ea1b36cc9cd76ebf5673c80498 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Fri, 1 Dec 2023 07:41:47 +0100 Subject: [PATCH 06/11] api.main: pass authenticated user name to pubsub.subscribe() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo Cañuelo --- api/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/api/main.py b/api/main.py index 2bad43bb..c84aca49 100644 --- a/api/main.py +++ b/api/main.py @@ -610,7 +610,7 @@ async def put_nodes( @app.post('/subscribe/{channel}', response_model=Subscription) async def subscribe(channel: str, user: User = Depends(get_current_user)): """Subscribe handler for Pub/Sub channel""" - return await pubsub.subscribe(channel) + return await pubsub.subscribe(channel, user.username) @app.post('/unsubscribe/{sub_id}') From b7c8fa8c38de6fccba3126b618e0272c11351ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Fri, 1 Dec 2023 07:43:21 +0100 Subject: [PATCH 07/11] api.main: rework the publish endpoint to include owner and optional attributes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sync the publish endpoint to the current implementation of pubsub.publish_cloudevent(). Add a 'owner' attribute in the event to save the username of the authenticated request user. Signed-off-by: Ricardo Cañuelo --- api/main.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/api/main.py b/api/main.py index c84aca49..02639390 100644 --- a/api/main.py +++ b/api/main.py @@ -646,7 +646,20 @@ async def listen(sub_id: int, user: User = Depends(get_current_user)): async def publish(event: PublishEvent, channel: str, user: User = Depends(get_current_user)): """Publish an event on the provided Pub/Sub channel""" - await pubsub.publish_cloudevent(channel, event.data, event.attributes) + event_dict = PublishEvent.dict(event) + # 1 - Extract data and attributes from the event + # 2 - Add the owner as an extra attribute + # 3 - Collect all the other extra attributes, if available, without + # overwriting any of the standard ones in the dict + data = event_dict.pop('data') + extra_attributes = event_dict.pop("attributes") + attributes = event_dict + attributes['owner'] = user.username + if extra_attributes: + for k in extra_attributes: + if k not in attributes: + attributes[k] = extra_attributes[k] + await pubsub.publish_cloudevent(channel, data, attributes) @app.post('/push/{list_name}') From 53b991318ac1d874ca5f9664472149a44682dae5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Fri, 1 Dec 2023 07:47:30 +0100 Subject: [PATCH 08/11] api.main: add "owner" to events for node-insertion endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo Cañuelo --- api/main.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/api/main.py b/api/main.py index 02639390..316e4eaa 100644 --- a/api/main.py +++ b/api/main.py @@ -549,7 +549,10 @@ async def post_node(node: Node, node.owner = current_user.username obj = await db.create(node) data = _get_node_event_data('created', obj) - await pubsub.publish_cloudevent('node', data) + attributes = {} + if data.get('owner', None): + attributes['owner'] = data['owner'] + await pubsub.publish_cloudevent('node', data, attributes) return obj @@ -578,7 +581,10 @@ async def put_node(node_id: str, node: Node, obj = await db.update(node) data = _get_node_event_data('updated', obj) - await pubsub.publish_cloudevent('node', data) + attributes = {} + if data.get('owner', None): + attributes['owner'] = data['owner'] + await pubsub.publish_cloudevent('node', data, attributes) return obj @@ -600,7 +606,10 @@ async def put_nodes( await _set_node_ownership_recursively(user, nodes) obj_list = await db.create_hierarchy(nodes, Node) data = _get_node_event_data('updated', obj_list[0]) - await pubsub.publish_cloudevent('node', data) + attributes = {} + if data.get('owner', None): + attributes['owner'] = data['owner'] + await pubsub.publish_cloudevent('node', data, attributes) return obj_list From cff871a6c9eb6cbe45f710f7fc7efec3d4165fdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Fri, 1 Dec 2023 07:48:50 +0100 Subject: [PATCH 09/11] api: check ownership in unsubscribe endpoint MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let a user unsubscribe only from his/her own subscriptions. Prior to this change, anyone could unsubscribe anybody else from whatever active subscription. Signed-off-by: Ricardo Cañuelo --- api/main.py | 7 ++++++- api/pubsub.py | 11 +++++++++-- 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/api/main.py b/api/main.py index 316e4eaa..7991dba0 100644 --- a/api/main.py +++ b/api/main.py @@ -626,12 +626,17 @@ async def subscribe(channel: str, user: User = Depends(get_current_user)): async def unsubscribe(sub_id: int, user: User = Depends(get_current_user)): """Unsubscribe handler for Pub/Sub channel""" try: - await pubsub.unsubscribe(sub_id) + await pubsub.unsubscribe(sub_id, user.username) except KeyError as error: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Subscription id not found: {str(error)}" ) from error + except RuntimeError as error: + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=str(error) + ) from error @app.get('/listen/{sub_id}') diff --git a/api/pubsub.py b/api/pubsub.py index be7345c1..c9e38446 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -107,14 +107,21 @@ async def subscribe(self, channel, user): self._start_keep_alive_timer() return sub - async def unsubscribe(self, sub_id): + async def unsubscribe(self, sub_id, user=None): """Unsubscribe from a Pub/Sub channel Unsubscribe from a channel using the provided subscription id as found in a Subscription object. """ async with self._lock: - sub = self._subscriptions.pop(sub_id) + sub = self._subscriptions[sub_id] + # Only allow a user to unsubscribe its own + # subscriptions. One exception: let an anonymous (internal) + # call to this function to unsubscribe any subscription + if user and user != sub['sub'].user: + raise RuntimeError(f"Subscription {sub_id} " + f"not owned by {user}") + self._subscriptions.pop(sub_id) self._update_channels() await sub['redis_sub'].unsubscribe() From 0fcdfd278828eac5d7b122da9b9928bfc235ac45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Fri, 1 Dec 2023 09:52:40 +0100 Subject: [PATCH 10/11] tests: update unit and e2e tests, sync to latest pubsub changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo Cañuelo --- tests/e2e_tests/test_subscribe_handler.py | 6 +++--- tests/unit_tests/conftest.py | 6 ++++-- tests/unit_tests/test_pubsub.py | 4 ++-- tests/unit_tests/test_subscribe_handler.py | 4 ++-- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/e2e_tests/test_subscribe_handler.py b/tests/e2e_tests/test_subscribe_handler.py index 6f27d21b..4413179f 100644 --- a/tests/e2e_tests/test_subscribe_handler.py +++ b/tests/e2e_tests/test_subscribe_handler.py @@ -28,7 +28,7 @@ def test_subscribe_node_channel(test_client): ) pytest.node_channel_subscription_id = response.json()['id'] assert response.status_code == 200 - assert ('id', 'channel') == tuple(response.json().keys()) + assert ('id', 'channel', 'user') == tuple(response.json().keys()) assert response.json().get('channel') == 'node' @@ -51,7 +51,7 @@ def test_subscribe_test_channel(test_client): ) pytest.test_channel_subscription_id = response.json()['id'] assert response.status_code == 200 - assert ('id', 'channel') == tuple(response.json().keys()) + assert ('id', 'channel', 'user') == tuple(response.json().keys()) assert response.json().get('channel') == 'test_channel' @@ -75,5 +75,5 @@ def test_subscribe_user_group_channel(test_client): ) pytest.user_group_channel_subscription_id = response.json()['id'] assert response.status_code == 200 - assert ('id', 'channel') == tuple(response.json().keys()) + assert ('id', 'channel', 'user') == tuple(response.json().keys()) assert response.json().get('channel') == 'user_group' diff --git a/tests/unit_tests/conftest.py b/tests/unit_tests/conftest.py index 7b2e5690..7a97ec2d 100644 --- a/tests/unit_tests/conftest.py +++ b/tests/unit_tests/conftest.py @@ -28,7 +28,7 @@ ) from api.models import UserGroup from api.user_models import User -from api.pubsub import PubSub +from api.pubsub import PubSub, Subscription BEARER_TOKEN = "Bearer \ eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJib2IifQ.\ @@ -223,8 +223,10 @@ def mock_pubsub_subscriptions(mocker): """Mocks `_redis` and `_subscriptions` member of PubSub class instance""" pubsub = PubSub() redis_mock = fakeredis.aioredis.FakeRedis() + sub = Subscription(id=1, channel='test', user='test') mocker.patch.object(pubsub, '_redis', redis_mock) - subscriptions_mock = dict({1: pubsub._redis.pubsub()}) + subscriptions_mock = dict( + {1: {'sub': sub, 'redis_sub': pubsub._redis.pubsub()}}) mocker.patch.object(pubsub, '_subscriptions', subscriptions_mock) return pubsub diff --git a/tests/unit_tests/test_pubsub.py b/tests/unit_tests/test_pubsub.py index 90ce03e5..481a95b5 100644 --- a/tests/unit_tests/test_pubsub.py +++ b/tests/unit_tests/test_pubsub.py @@ -23,7 +23,7 @@ async def test_subscribe_single_channel(mock_pubsub): PubSub._subscriptions dict should have one entry. This entry's key should be equal 1. """ - result = await mock_pubsub.subscribe('CHANNEL') + result = await mock_pubsub.subscribe('CHANNEL', 'test') assert result.channel == 'CHANNEL' assert result.id == 1 assert len(mock_pubsub._subscriptions) == 1 @@ -48,7 +48,7 @@ async def test_subscribe_multiple_channels(mock_pubsub): await mock_pubsub._redis.set(mock_pubsub.ID_KEY, 0) channels = ((1, 'CHANNEL1'), (2, 'CHANNEL2'), (3, 'CHANNEL3')) for expected_id, expected_channel in channels: - result = await mock_pubsub.subscribe(expected_channel) + result = await mock_pubsub.subscribe(expected_channel, 'test') assert result.channel == expected_channel assert result.id == expected_id assert len(mock_pubsub._subscriptions) == 3 diff --git a/tests/unit_tests/test_subscribe_handler.py b/tests/unit_tests/test_subscribe_handler.py index 555cea8a..aaf2614d 100644 --- a/tests/unit_tests/test_subscribe_handler.py +++ b/tests/unit_tests/test_subscribe_handler.py @@ -18,7 +18,7 @@ def test_subscribe_endpoint(mock_subscribe, test_client): HTTP Response Code 200 OK JSON with 'id' and 'channel' keys """ - subscribe = Subscription(id=1, channel='abc') + subscribe = Subscription(id=1, channel='abc', user='test') mock_subscribe.return_value = subscribe response = test_client.post( @@ -29,4 +29,4 @@ def test_subscribe_endpoint(mock_subscribe, test_client): ) print("response.json()", response.json()) assert response.status_code == 200 - assert ('id', 'channel') == tuple(response.json().keys()) + assert ('id', 'channel', 'user') == tuple(response.json().keys()) From fd6d21698111a10b8bb5aa01ce06cf41ba0f0e9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Ca=C3=B1uelo?= Date: Wed, 13 Dec 2023 12:19:25 +0100 Subject: [PATCH 11/11] api.pubsub: only let users listen for their own subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Ricardo Cañuelo --- api/main.py | 2 +- api/pubsub.py | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/api/main.py b/api/main.py index 7991dba0..f87243e1 100644 --- a/api/main.py +++ b/api/main.py @@ -643,7 +643,7 @@ async def unsubscribe(sub_id: int, user: User = Depends(get_current_user)): async def listen(sub_id: int, user: User = Depends(get_current_user)): """Listen messages from a subscribed Pub/Sub channel""" try: - return await pubsub.listen(sub_id) + return await pubsub.listen(sub_id, user.username) except KeyError as error: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, diff --git a/api/pubsub.py b/api/pubsub.py index c9e38446..93365789 100644 --- a/api/pubsub.py +++ b/api/pubsub.py @@ -125,7 +125,7 @@ async def unsubscribe(self, sub_id, user=None): self._update_channels() await sub['redis_sub'].unsubscribe() - async def listen(self, sub_id): + async def listen(self, sub_id, user=None): """Listen for Pub/Sub messages Listen on a given subscription id asynchronously and return a message @@ -134,6 +134,12 @@ async def listen(self, sub_id): async with self._lock: sub = self._subscriptions[sub_id] + # Only allow a user to listen to its own subscriptions. One + # exception: let an anonymous (internal) call to this function + # to listen to any subscription + if user and user != sub['sub'].user: + raise RuntimeError(f"Subscription {sub_id} " + f"not owned by {user}") while True: msg = await sub['redis_sub'].get_message( ignore_subscribe_messages=True, timeout=1.0