Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kci event commands for unicast list push and pop #2206

Merged
merged 1 commit into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions kernelci/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,14 @@ def send_event(self, channel: str, data):
def receive_event(self, sub_id: int) -> CloudEvent:
"""Listen and receive an event from a given subscription id"""

@abc.abstractmethod
def push_event(self, list_name: str, data):
"""Push an event to a given Redis List"""

@abc.abstractmethod
def pop_event(self, list_name: str) -> CloudEvent:
"""Listen and pop an event from a given List"""

# -----
# Nodes
# -----
Expand Down
4 changes: 4 additions & 0 deletions kernelci/api/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def receive_event_data(self, sub_id):
"""Receive CloudEvent from Pub/Sub and return its data payload"""
return self.api.receive_event(sub_id).data

def pop_event_data(self, list_name):
"""Receive CloudEvent from Redis list and return its data payload"""
return self.api.pop_event(list_name).data

def get_node_from_event(self, event_data):
"""Listen for an event and get the matching node object from it"""
return self.api.get_node(event_data['id'])
Expand Down
12 changes: 12 additions & 0 deletions kernelci/api/latest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""KernelCI API bindings for the latest version"""

import enum
import json
from typing import Optional, Sequence

from cloudevents.http import from_json
Expand Down Expand Up @@ -72,6 +73,17 @@ def receive_event(self, sub_id: int):
continue
return event

def push_event(self, list_name: str, data):
self._post('/'.join(['push', list_name]), data)

def pop_event(self, list_name: str):
path = '/'.join(['pop', str(list_name)])
while True:
resp = self._get(path)
data = json.dumps(resp.json())
event = from_json(data)
return event

def get_node(self, node_id: str) -> dict:
return self._get(f'node/{node_id}').json()

Expand Down
36 changes: 36 additions & 0 deletions kernelci/cli/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@

@kci_event.command(secrets=True)
@click.argument('channel')
@Args.config

Check failure on line 27 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@Args.api

Check failure on line 28 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
def subscribe(config, api, channel, secrets):
"""Subscribe to a Pub/Sub channel"""
configs = kernelci.config.load(config)
Expand All @@ -37,8 +37,8 @@

@kci_event.command(secrets=True)
@click.argument('sub_id')
@Args.config

Check failure on line 40 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@Args.api

Check failure on line 41 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
def unsubscribe(config, api, sub_id, secrets):
"""Unsubscribe from a Pub/Sub channel"""
configs = kernelci.config.load(config)
Expand All @@ -49,8 +49,8 @@

@kci_event.command(secrets=True)
@click.option('--is-json', help="Parse input data as JSON", is_flag=True)
@Args.config

Check failure on line 52 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@Args.api

Check failure on line 53 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@click.argument('channel')
def send(config, api, is_json, channel, secrets):
"""Read some data on stdin and send it as an event on a channel"""
Expand All @@ -65,9 +65,9 @@

@kci_event.command(secrets=True)
@click.argument('sub_id')
@Args.config

Check failure on line 68 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@Args.api

Check failure on line 69 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@Args.indent

Check failure on line 70 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
def receive(config, api, indent, sub_id, secrets):
"""Wait and receive an event from a subscription and print on stdout"""
configs = kernelci.config.load(config)
Expand All @@ -81,3 +81,39 @@
click.echo(json.dumps(event, indent=indent))
else:
click.echo(event)


@kci_event.command(secrets=True)
@click.option('--is-json', help="Parse input data as JSON", is_flag=True)
@Args.config

Check failure on line 88 in kernelci/cli/event.py

View workflow job for this annotation

GitHub Actions / Lint

Access to generic instance variables via class is ambiguous [misc]
@Args.api
@click.argument('list_name')
def push(config, api, is_json, list_name, secrets):
"""Read some data on stdin and push it as an event on a list"""
configs = kernelci.config.load(config)
api_config = configs['api'][api]
api = kernelci.api.get_api(api_config, secrets.api.token)
data = sys.stdin.read()
if is_json:
data = json.loads(data)
api.push_event(list_name, {'data': data})


@kci_event.command(secrets=True)
@click.argument('list_name')
@Args.config
@Args.api
@Args.indent
def pop(config, api, indent, list_name, secrets):
"""Wait and pop an event from a List when received print on stdout"""
configs = kernelci.config.load(config)
api_config = configs['api'][api]
api = kernelci.api.get_api(api_config, secrets.api.token)
helper = kernelci.api.helper.APIHelper(api)
event = helper.pop_event_data(list_name)
if isinstance(event, str):
click.echo(event.strip())
elif isinstance(event, dict):
click.echo(json.dumps(event, indent=indent))
else:
click.echo(event)
Loading