Skip to content

Commit

Permalink
Add sample scripts for pub, sub and rpc in tdk module
Browse files Browse the repository at this point in the history
  • Loading branch information
neelam-kushwah committed Aug 9, 2024
1 parent 48ec02d commit f2870b1
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 5 deletions.
8 changes: 4 additions & 4 deletions simulator/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@
SPDX-License-Identifier: Apache-2.0
"""

import asyncio
import os
import sys
import time

from tdk.apis.apis import TdkApis

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

import asyncio
import time

from flask import request
from flask_socketio import SocketIO

from simulator.ui import create_app
from simulator.ui.config import config_dict
from simulator.ui.utils.socket_utils import SocketUtility
from simulator.utils import constant
from tdk.apis.apis import TdkApis
from tdk.helper import someip_helper
from tdk.helper.transport_configuration import TransportConfiguration

Expand Down
Empty file added tdk/__init__.py
Empty file.
3 changes: 2 additions & 1 deletion tdk/core/abstract_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,10 @@ async def start_rpc_service(self) -> bool:
for attr1 in dir(getattr(self, attr)):
if attr1 == 'handle_request':
func = getattr(self, attr)
if attr == '__class__':
continue
method_uri_str = protobuf_autoloader.get_rpc_uri_by_name(self.service_id, attr)
method_uri = protobuf_autoloader.get_uuri_from_name(method_uri_str)

status = await self.tdk_apis.register_request_handler(method_uri, func)
service_util.print_register_rpc_status(method_uri_str, status.code, status.message)

Expand Down
Empty file added tdk/examples/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions tdk/examples/common_methods.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at
http://www.apache.org/licenses/LICENSE-2.0
SPDX-License-Identifier: Apache-2.0
"""

import logging

from uprotocol.v1.uri_pb2 import UUri

from tdk.helper.transport_configuration import TransportConfiguration

# Configure the logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')


def get_transport_config():
"""Set the helper config, one time configuration."""
config = TransportConfiguration()
config.set_zenoh_config("10.0.0.33", 9090) # this will set the zenoh helper
return config


def create_method_uri():
return UUri(authority_name="Neelam", ue_id=4, ue_version_major=1, resource_id=3)
65 changes: 65 additions & 0 deletions tdk/examples/publish.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at
http://www.apache.org/licenses/LICENSE-2.0
SPDX-License-Identifier: Apache-2.0
"""

import os
import sys

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

import asyncio

from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.upayload import UPayload
from uprotocol.v1.uri_pb2 import UUri

from simulator.mockservices.hello_world import HelloWorldService
from tdk.apis.apis import TdkApis
from tdk.examples import common_methods


async def publish_to_zenoh():
# create topic uuri
topic = UUri(ue_id=4, ue_version_major=1, resource_id=0x8000)

# payload to publish UPayload.pack(/*your proto payload*/)
payload = UPayload.pack(UUri())

config = common_methods.get_transport_config()
tdk_apis = TdkApis(config)

status = await tdk_apis.publish(topic, CallOptions.DEFAULT, payload)
common_methods.logging.debug(f"Publish status {status}")
# Sleep for 3 seconds to simulate delay
await asyncio.sleep(3)


async def publish_to_one_second_event_to_zenoh():
config = common_methods.get_transport_config()
tdk_apis = TdkApis(config)

# create topic uuri
one_second_topic = "/example.hello_world/1/one_second#Timer"
timer_data = {'resource_id': 'one_second', 'time.hours': 2, 'time.minutes': 1, 'time.nanos': 2, 'time.seconds': 3}

# start service
service = HelloWorldService(transport_config=config, tdk_apis=tdk_apis)
await service.start()

status = await service.publish(one_second_topic, timer_data)
common_methods.logging.debug(f"Publish status {status}")
await asyncio.sleep(3)


if __name__ == '__main__':
asyncio.run(publish_to_one_second_event_to_zenoh())
51 changes: 51 additions & 0 deletions tdk/examples/rpc_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at
http://www.apache.org/licenses/LICENSE-2.0
SPDX-License-Identifier: Apache-2.0
"""

import os
import sys

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

import asyncio

from uprotocol.communication.calloptions import CallOptions
from uprotocol.communication.upayload import UPayload
from uprotocol.v1.uattributes_pb2 import (
UPayloadFormat,
)

from tdk.apis.apis import TdkApis
from tdk.examples import common_methods


async def send_rpc_request_to_zenoh():
# create uuri
method_uri = common_methods.create_method_uri()

# create UPayload
data = "GetCurrentTime" # your request payload
request_payload = UPayload(format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT, data=bytes([ord(c) for c in data]))

# invoke RPC method
common_methods.logging.debug(f"Send request to {method_uri}")

config = common_methods.get_transport_config()
tdk_apis = TdkApis(config)

response_payload = await tdk_apis.invoke_method(method_uri, request_payload, CallOptions(timeout=1000))
common_methods.logging.debug(f"RESPONSE PAYLOAD {response_payload}")


if __name__ == '__main__':
asyncio.run(send_rpc_request_to_zenoh())
58 changes: 58 additions & 0 deletions tdk/examples/rpc_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at
http://www.apache.org/licenses/LICENSE-2.0
SPDX-License-Identifier: Apache-2.0
"""

import os
import sys

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

import asyncio
from datetime import datetime

from uprotocol.communication.requesthandler import RequestHandler
from uprotocol.communication.upayload import UPayload
from uprotocol.v1.uattributes_pb2 import (
UPayloadFormat,
)
from uprotocol.v1.umessage_pb2 import UMessage

from tdk.apis.apis import TdkApis
from tdk.examples import common_methods


class MyRequestHandler(RequestHandler):
def handle_request(self, msg: UMessage) -> UPayload:
common_methods.logging.debug("Request Received by Service Request Handler")
attributes = msg.attributes
payload = msg.payload
source = attributes.source
sink = attributes.sink
common_methods.logging.debug(f"Receive {payload} from {source} to {sink}")
response_payload = format(datetime.utcnow()).encode('utf-8') # your response payload
payload = UPayload(data=response_payload, format=UPayloadFormat.UPAYLOAD_FORMAT_TEXT)
return payload


async def register_rpc():
uuri = common_methods.create_method_uri()
config = common_methods.get_transport_config()
tdk_apis = TdkApis(config)
status = await tdk_apis.register_request_handler(uuri, MyRequestHandler())
common_methods.logging.debug(f"Request Handler Register status {status}")
while True:
await asyncio.sleep(1)


if __name__ == '__main__':
asyncio.run(register_rpc())
61 changes: 61 additions & 0 deletions tdk/examples/subscribe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at
http://www.apache.org/licenses/LICENSE-2.0
SPDX-License-Identifier: Apache-2.0
"""

import os
import sys

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..', '..')))

import asyncio

from uprotocol.transport.ulistener import UListener
from uprotocol.v1.umessage_pb2 import UMessage

from tdk.apis.apis import TdkApis
from tdk.core import protobuf_autoloader
from tdk.examples import common_methods


class MyListener(UListener):
async def on_receive(self, msg: UMessage):
common_methods.logging.debug('on receive called')
common_methods.logging.debug(msg.payload)
common_methods.logging.debug(msg.attributes.__str__())


async def subscribe_if_subscription_service_is_running():
status = await tdk_apis.subscribe(topic, listener)
common_methods.logging.debug(f"Register Listener status {status}")
while True:
await asyncio.sleep(1)


async def subscribe_if_subscription_service_is_not_running():
status = await tdk_apis.register_listener(topic, listener)
common_methods.logging.debug(f"Register Listener status {status}")
while True:
await asyncio.sleep(1)


if __name__ == '__main__':
# create topic uuri
one_second_topic = "/example.hello_world/1/one_second#Timer"
topic = protobuf_autoloader.get_uuri_from_name(one_second_topic)

listener = MyListener()
config = common_methods.get_transport_config()
tdk_apis = TdkApis(config)
# if subscription service is not available, use up-L1 (Transport) apis
asyncio.run(subscribe_if_subscription_service_is_not_running())
# if subscription service is running, use up-L2 (Communication) apis

0 comments on commit f2870b1

Please sign in to comment.