From 4585e8a99f3850549ec611097fe3aedfd3de17e2 Mon Sep 17 00:00:00 2001 From: Jeny Sadadia Date: Wed, 22 Nov 2023 18:18:11 +0530 Subject: [PATCH] kernelci.cli: add commands for pop and push from unicast list Add `kci event push` command to push event to a given Redis unicast list. Add `kci event pop` command to pop event from the list for a single consumer. Signed-off-by: Jeny Sadadia --- kernelci/api/__init__.py | 8 ++++++++ kernelci/api/helper.py | 4 ++++ kernelci/api/latest.py | 12 ++++++++++++ kernelci/cli/event.py | 36 ++++++++++++++++++++++++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/kernelci/api/__init__.py b/kernelci/api/__init__.py index 38e054539a..1a6184e02a 100644 --- a/kernelci/api/__init__.py +++ b/kernelci/api/__init__.py @@ -194,6 +194,14 @@ def send_event(self, channel: str, data): def receive_event(self, sub_id: int) -> CloudEvent: """Listen and receive an event from a given subscription id""" + @abc.abstractmethod + def push_event(self, list_name: str, data): + """Push an event to a given Redis List""" + + @abc.abstractmethod + def pop_event(self, list_name: str) -> CloudEvent: + """Listen and pop an event from a given List""" + # ----- # Nodes # ----- diff --git a/kernelci/api/helper.py b/kernelci/api/helper.py index db1f94d42a..a822754e65 100644 --- a/kernelci/api/helper.py +++ b/kernelci/api/helper.py @@ -43,6 +43,10 @@ def receive_event_data(self, sub_id): """Receive CloudEvent from Pub/Sub and return its data payload""" return self.api.receive_event(sub_id).data + def pop_event_data(self, list_name): + """Receive CloudEvent from Redis list and return its data payload""" + return self.api.pop_event(list_name).data + def get_node_from_event(self, event_data): """Listen for an event and get the matching node object from it""" return self.api.get_node(event_data['id']) diff --git a/kernelci/api/latest.py b/kernelci/api/latest.py index 38c203c5b6..9c0dd682d5 100644 --- a/kernelci/api/latest.py +++ b/kernelci/api/latest.py @@ -6,6 +6,7 @@ """KernelCI API bindings for the latest version""" import enum +import json from typing import Optional, Sequence from cloudevents.http import from_json @@ -72,6 +73,17 @@ def receive_event(self, sub_id: int): continue return event + def push_event(self, list_name: str, data): + self._post('/'.join(['push', list_name]), data) + + def pop_event(self, list_name: str): + path = '/'.join(['pop', str(list_name)]) + while True: + resp = self._get(path) + data = json.dumps(resp.json()) + event = from_json(data) + return event + def get_node(self, node_id: str) -> dict: return self._get(f'node/{node_id}').json() diff --git a/kernelci/cli/event.py b/kernelci/cli/event.py index 820f4bde41..5368e88398 100644 --- a/kernelci/cli/event.py +++ b/kernelci/cli/event.py @@ -81,3 +81,39 @@ def receive(config, api, indent, sub_id, secrets): click.echo(json.dumps(event, indent=indent)) else: click.echo(event) + + +@kci_event.command(secrets=True) +@click.option('--is-json', help="Parse input data as JSON", is_flag=True) +@Args.config +@Args.api +@click.argument('list_name') +def push(config, api, is_json, list_name, secrets): + """Read some data on stdin and push it as an event on a list""" + configs = kernelci.config.load(config) + api_config = configs['api'][api] + api = kernelci.api.get_api(api_config, secrets.api.token) + data = sys.stdin.read() + if is_json: + data = json.loads(data) + api.push_event(list_name, {'data': data}) + + +@kci_event.command(secrets=True) +@click.argument('list_name') +@Args.config +@Args.api +@Args.indent +def pop(config, api, indent, list_name, secrets): + """Wait and pop an event from a List when received print on stdout""" + configs = kernelci.config.load(config) + api_config = configs['api'][api] + api = kernelci.api.get_api(api_config, secrets.api.token) + helper = kernelci.api.helper.APIHelper(api) + event = helper.pop_event_data(list_name) + if isinstance(event, str): + click.echo(event.strip()) + elif isinstance(event, dict): + click.echo(json.dumps(event, indent=indent)) + else: + click.echo(event)