Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add functionality to Dispatcher-agent for handling RPC command Reques… #568

Draft
wants to merge 12 commits into
base: develop
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
SPDX-License-Identifier: Apache-2.0
"""


import xml.etree.ElementTree as ET
from google.protobuf.timestamp_pb2 import Timestamp
from cloudadapter.pb.common.v1.common_pb2 import UpdateSystemSoftwareOperation, RpcActivateOperation, Operation, Schedule
Expand Down Expand Up @@ -115,19 +114,14 @@ def convert_operation_to_xml_manifests(operation: Operation) -> ET.Element:
result.append(xml_manifest)
return result


def convert_rpc_activate_operation_to_xml_manifest(operation: RpcActivateOperation) -> str:
"""Converts a RpcActivateOperation message to an XML manifest string for Dispatcher."""
# Create the root element
manifest = ET.Element('manifest')
ET.SubElement(manifest, 'type').text = 'cmd'

cmd = ET.SubElement(manifest, 'cmd')
header = ET.SubElement(cmd, 'header')
ET.SubElement(header, 'type').text = 'rpc'

type = ET.SubElement(cmd, 'type')
rpc = ET.SubElement(type, 'rpc')
rpc = ET.SubElement(cmd, 'rpc')

if operation.url:
ET.SubElement(rpc, 'fetch').text = operation.url
Expand All @@ -140,8 +134,6 @@ def convert_rpc_activate_operation_to_xml_manifest(operation: RpcActivateOperati
xml_str = ET.tostring(manifest, encoding='utf-8', method='xml').decode('utf-8')
return xml_declaration + '\n' + xml_str



def convert_system_software_operation_to_xml_manifest(operation: UpdateSystemSoftwareOperation) -> str:
"""Converts a UpdateSystemSoftwareOperation message to an XML manifest string for Dispatcher."""
# Create the root element
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@
)

RPC_OPERATION_LARGE_MANIFEST_XML = (
'<?xml version="1.0" encoding="utf-8"?>\n'
"<manifest><type>cmd</type><cmd><header><type>rpc</type></header>"
'<type><rpc><fetch>http://example.com/server</fetch><profileName>UDM</profileName>'
"</rpc></type></cmd></manifest>"
'<?xml version="1.0" encoding="utf-8"?>'
"<manifest><type>cmd</type><cmd>rpc</cmd>"
"<rpc><fetch>http://example.com/server</fetch><profileName>UDM</profileName>"
"</rpc></manifest>"
)


Expand Down
9 changes: 9 additions & 0 deletions inbm/dispatcher-agent/dispatcher/dispatcher_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from inbm_common_lib.exceptions import UrlSecurityException

from .schedule.manifest_parser import ScheduleManifestParser, SCHEDULE_SCHEMA_LOCATION
from .schedule.rpc_activate_operation import RpcActivateOperation
from .schedule.schedules import Schedule
from .schedule.sqlite_manager import SqliteManager
from .schedule.apscheduler import APScheduler
Expand Down Expand Up @@ -260,6 +261,14 @@ def _perform_cmd_type_operation(self, parsed_head: XmlHandler, xml: str) -> Resu
elif cmd == "query":
self._dispatcher_broker.mqtt_publish(QUERY_CMD_CHANNEL, xml)
return PUBLISH_SUCCESS
elif cmd == "rpc":
tsirlapu marked this conversation as resolved.
Show resolved Hide resolved
url = parsed_head.get_children('rpc')['fetch']
name = parsed_head.get_children('rpc')['profileName']
rpc_result = RpcActivateOperation().execute_rpc_activation_cmd(url, name)
if rpc_result == 'success':
self._send_result(message= "Successful RPC Activation Operation")
else:
self._send_result(message= "Failed RPC Activation Operation")
elif cmd == "custom":
header = parsed_head.get_children('custom')
json_data = header['data']
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Executes the AMT Rpc Activation command.

Copyright (C) 2024 Intel Corporation
SPDX-License-Identifier: Apache-2.0
"""
from typing import Optional
import shlex
from inbm_common_lib.shell_runner import PseudoShellRunner



class RpcActivateOperation:
def __init__(self) -> None:
"""RpcActivateOperation class is to execute rpc activate command
"""
pass

def execute_rpc_activation_cmd(self, url, name) -> str:
tsirlapu marked this conversation as resolved.
Show resolved Hide resolved
"""Executes the RPC activation command.

@param url: url address of the RPS
@param name: profile name used for rpc configuration
@return: return 'success' if execution succeeds else returns 'Failure'
"""
url = shlex.quote(url)
name = shlex.quote(name)
command = f"rpc activate -u {url}/activate -n --profile {name}"
(out, err, code) = PseudoShellRunner().run(command)
tsirlapu marked this conversation as resolved.
Show resolved Hide resolved
if code == 0 and 'CIRA: Configured' in out:
return "Success"
else:
return "Failure"
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""
Execute Rpc Activation command functionality test.

Copyright (C) 2024 Intel Corporation
SPDX-License-Identifier: Apache-2.0
"""
from unittest import TestCase

from unittest.mock import patch
from dispatcher.schedule.rpc_activate_operation import *


class TestRpcActivateOperation(TestCase):

def setUp(self) -> None:
self.url = "wss://1.1.1.1"
self.profile_name = "profilename"
pass

@patch("inbm_common_lib.shell_runner.PseudoShellRunner.run", return_value=('Success', "", 0))
def test_execute_rpc_activation_cmd_success(self, mock_run) -> None:
rpc_result = RpcActivateOperation().execute_rpc_activation_cmd(self.url, self.profile_name)
self.assertEqual(rpc_result, 'Success')

@patch("inbm_common_lib.shell_runner.PseudoShellRunner.run", return_value=('Failure', "", 0))
def test_execute_rpc_activation_cmd_failure(self, mock_run) -> None:
rpc_result = RpcActivateOperation().execute_rpc_activation_cmd(self.url, self.profile_name)
self.assertEqual(rpc_result, 'Failure')