From f2870b1bfab9af433ddc1f50881ec9fb1cc14004 Mon Sep 17 00:00:00 2001 From: Neelam Kushwah Date: Fri, 9 Aug 2024 14:20:10 -0400 Subject: [PATCH] Add sample scripts for pub, sub and rpc in tdk module --- simulator/run.py | 8 ++--- tdk/__init__.py | 0 tdk/core/abstract_service.py | 3 +- tdk/examples/__init__.py | 0 tdk/examples/common_methods.py | 33 +++++++++++++++++ tdk/examples/publish.py | 65 ++++++++++++++++++++++++++++++++++ tdk/examples/rpc_client.py | 51 ++++++++++++++++++++++++++ tdk/examples/rpc_server.py | 58 ++++++++++++++++++++++++++++++ tdk/examples/subscribe.py | 61 +++++++++++++++++++++++++++++++ 9 files changed, 274 insertions(+), 5 deletions(-) create mode 100644 tdk/__init__.py create mode 100644 tdk/examples/__init__.py create mode 100644 tdk/examples/common_methods.py create mode 100644 tdk/examples/publish.py create mode 100644 tdk/examples/rpc_client.py create mode 100644 tdk/examples/rpc_server.py create mode 100644 tdk/examples/subscribe.py diff --git a/simulator/run.py b/simulator/run.py index c6a102a..02409d8 100644 --- a/simulator/run.py +++ b/simulator/run.py @@ -19,15 +19,14 @@ SPDX-License-Identifier: Apache-2.0 """ -import asyncio import os import sys -import time - -from tdk.apis.apis import TdkApis sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) +import asyncio +import time + from flask import request from flask_socketio import SocketIO @@ -35,6 +34,7 @@ from simulator.ui.config import config_dict from simulator.ui.utils.socket_utils import SocketUtility from simulator.utils import constant +from tdk.apis.apis import TdkApis from tdk.helper import someip_helper from tdk.helper.transport_configuration import TransportConfiguration diff --git a/tdk/__init__.py b/tdk/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tdk/core/abstract_service.py b/tdk/core/abstract_service.py index 2fa78e5..0c65b29 100644 --- a/tdk/core/abstract_service.py +++ b/tdk/core/abstract_service.py @@ -120,9 +120,10 @@ async def start_rpc_service(self) -> bool: for attr1 in dir(getattr(self, attr)): if attr1 == 'handle_request': func = getattr(self, attr) + if attr == '__class__': + continue method_uri_str = protobuf_autoloader.get_rpc_uri_by_name(self.service_id, attr) method_uri = protobuf_autoloader.get_uuri_from_name(method_uri_str) - status = await self.tdk_apis.register_request_handler(method_uri, func) service_util.print_register_rpc_status(method_uri_str, status.code, status.message) diff --git a/tdk/examples/__init__.py b/tdk/examples/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tdk/examples/common_methods.py b/tdk/examples/common_methods.py new file mode 100644 index 0000000..e43c541 --- /dev/null +++ b/tdk/examples/common_methods.py @@ -0,0 +1,33 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import logging + +from uprotocol.v1.uri_pb2 import UUri + +from tdk.helper.transport_configuration import TransportConfiguration + +# Configure the logging +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s') + + +def get_transport_config(): + """Set the helper config, one time configuration.""" + config = TransportConfiguration() + config.set_zenoh_config("10.0.0.33", 9090) # this will set the zenoh helper + return config + + +def create_method_uri(): + return UUri(authority_name="Neelam", ue_id=4, ue_version_major=1, resource_id=3) diff --git a/tdk/examples/publish.py b/tdk/examples/publish.py new file mode 100644 index 0000000..c0f124a --- /dev/null +++ b/tdk/examples/publish.py @@ -0,0 +1,65 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + +import asyncio + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uri_pb2 import UUri + +from simulator.mockservices.hello_world import HelloWorldService +from tdk.apis.apis import TdkApis +from tdk.examples import common_methods + + +async def publish_to_zenoh(): + # create topic uuri + topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000) + + # payload to publish UPayload.pack(/*your proto payload*/) + payload = UPayload.pack(UUri()) + + config = common_methods.get_transport_config() + tdk_apis = TdkApis(config) + + status = await tdk_apis.publish(topic, CallOptions.DEFAULT, payload) + common_methods.logging.debug(f"Publish status {status}") + # Sleep for 3 seconds to simulate delay + await asyncio.sleep(3) + + +async def publish_to_one_second_event_to_zenoh(): + config = common_methods.get_transport_config() + tdk_apis = TdkApis(config) + + # create topic uuri + one_second_topic = "/example.hello_world/1/one_second#Timer" + timer_data = {'resource_id': 'one_second', 'time.hours': 2, 'time.minutes': 1, 'time.nanos': 2, 'time.seconds': 3} + + # start service + service = HelloWorldService(transport_config=config, tdk_apis=tdk_apis) + await service.start() + + status = await service.publish(one_second_topic, timer_data) + common_methods.logging.debug(f"Publish status {status}") + await asyncio.sleep(3) + + +if __name__ == '__main__': + asyncio.run(publish_to_one_second_event_to_zenoh()) diff --git a/tdk/examples/rpc_client.py b/tdk/examples/rpc_client.py new file mode 100644 index 0000000..ee15fe4 --- /dev/null +++ b/tdk/examples/rpc_client.py @@ -0,0 +1,51 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + +import asyncio + +from uprotocol.communication.calloptions import CallOptions +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import ( + UPayloadFormat, +) + +from tdk.apis.apis import TdkApis +from tdk.examples import common_methods + + +async def send_rpc_request_to_zenoh(): + # create uuri + method_uri = common_methods.create_method_uri() + + # create UPayload + data = "GetCurrentTime" # your request payload + request_payload = UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, data=bytes([ord(c) for c in data])) + + # invoke RPC method + common_methods.logging.debug(f"Send request to {method_uri}") + + config = common_methods.get_transport_config() + tdk_apis = TdkApis(config) + + response_payload = await tdk_apis.invoke_method(method_uri, request_payload, CallOptions(timeout=1000)) + common_methods.logging.debug(f"RESPONSE PAYLOAD {response_payload}") + + +if __name__ == '__main__': + asyncio.run(send_rpc_request_to_zenoh()) diff --git a/tdk/examples/rpc_server.py b/tdk/examples/rpc_server.py new file mode 100644 index 0000000..194d616 --- /dev/null +++ b/tdk/examples/rpc_server.py @@ -0,0 +1,58 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + +import asyncio +from datetime import datetime + +from uprotocol.communication.requesthandler import RequestHandler +from uprotocol.communication.upayload import UPayload +from uprotocol.v1.uattributes_pb2 import ( + UPayloadFormat, +) +from uprotocol.v1.umessage_pb2 import UMessage + +from tdk.apis.apis import TdkApis +from tdk.examples import common_methods + + +class MyRequestHandler(RequestHandler): + def handle_request(self, msg: UMessage) -> UPayload: + common_methods.logging.debug("Request Received by Service Request Handler") + attributes = msg.attributes + payload = msg.payload + source = attributes.source + sink = attributes.sink + common_methods.logging.debug(f"Receive {payload} from {source} to {sink}") + response_payload = format(datetime.utcnow()).encode('utf-8') # your response payload + payload = UPayload(data=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT) + return payload + + +async def register_rpc(): + uuri = common_methods.create_method_uri() + config = common_methods.get_transport_config() + tdk_apis = TdkApis(config) + status = await tdk_apis.register_request_handler(uuri, MyRequestHandler()) + common_methods.logging.debug(f"Request Handler Register status {status}") + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(register_rpc()) diff --git a/tdk/examples/subscribe.py b/tdk/examples/subscribe.py new file mode 100644 index 0000000..1d743c7 --- /dev/null +++ b/tdk/examples/subscribe.py @@ -0,0 +1,61 @@ +""" +SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation + +See the NOTICE file(s) distributed with this work for additional +information regarding copyright ownership. + +This program and the accompanying materials are made available under the +terms of the Apache License Version 2.0 which is available at + + http://www.apache.org/licenses/LICENSE-2.0 + +SPDX-License-Identifier: Apache-2.0 +""" + +import os +import sys + +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..'))) + +import asyncio + +from uprotocol.transport.ulistener import UListener +from uprotocol.v1.umessage_pb2 import UMessage + +from tdk.apis.apis import TdkApis +from tdk.core import protobuf_autoloader +from tdk.examples import common_methods + + +class MyListener(UListener): + async def on_receive(self, msg: UMessage): + common_methods.logging.debug('on receive called') + common_methods.logging.debug(msg.payload) + common_methods.logging.debug(msg.attributes.__str__()) + + +async def subscribe_if_subscription_service_is_running(): + status = await tdk_apis.subscribe(topic, listener) + common_methods.logging.debug(f"Register Listener status {status}") + while True: + await asyncio.sleep(1) + + +async def subscribe_if_subscription_service_is_not_running(): + status = await tdk_apis.register_listener(topic, listener) + common_methods.logging.debug(f"Register Listener status {status}") + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + # create topic uuri + one_second_topic = "/example.hello_world/1/one_second#Timer" + topic = protobuf_autoloader.get_uuri_from_name(one_second_topic) + + listener = MyListener() + config = common_methods.get_transport_config() + tdk_apis = TdkApis(config) + # if subscription service is not available, use up-L1 (Transport) apis + asyncio.run(subscribe_if_subscription_service_is_not_running()) + # if subscription service is running, use up-L2 (Communication) apis