diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/inbs/operation.py b/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/inbs/operation.py index 9aa977c48..7007052f2 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/inbs/operation.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/inbs/operation.py @@ -3,7 +3,6 @@ SPDX-License-Identifier: Apache-2.0 """ - import xml.etree.ElementTree as ET from google.protobuf.timestamp_pb2 import Timestamp from cloudadapter.pb.common.v1.common_pb2 import UpdateSystemSoftwareOperation, RpcActivateOperation, Operation, Schedule @@ -115,7 +114,6 @@ def convert_operation_to_xml_manifests(operation: Operation) -> ET.Element: result.append(xml_manifest) return result - def convert_rpc_activate_operation_to_xml_manifest(operation: RpcActivateOperation) -> str: """Converts a RpcActivateOperation message to an XML manifest string for Dispatcher.""" # Create the root element @@ -123,11 +121,7 @@ def convert_rpc_activate_operation_to_xml_manifest(operation: RpcActivateOperati ET.SubElement(manifest, 'type').text = 'cmd' cmd = ET.SubElement(manifest, 'cmd') - header = ET.SubElement(cmd, 'header') - ET.SubElement(header, 'type').text = 'rpc' - - type = ET.SubElement(cmd, 'type') - rpc = ET.SubElement(type, 'rpc') + rpc = ET.SubElement(cmd, 'rpc') if operation.url: ET.SubElement(rpc, 'fetch').text = operation.url @@ -140,8 +134,6 @@ def convert_rpc_activate_operation_to_xml_manifest(operation: RpcActivateOperati xml_str = ET.tostring(manifest, encoding='utf-8', method='xml').decode('utf-8') return xml_declaration + '\n' + xml_str - - def convert_system_software_operation_to_xml_manifest(operation: UpdateSystemSoftwareOperation) -> str: """Converts a UpdateSystemSoftwareOperation message to an XML manifest string for Dispatcher.""" # Create the root element diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/adapters/inbs/test_operation.py b/inbm/cloudadapter-agent/tests/unit/cloud/adapters/inbs/test_operation.py index df36e46e9..91f03abe3 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/adapters/inbs/test_operation.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/adapters/inbs/test_operation.py @@ -37,10 +37,10 @@ ) RPC_OPERATION_LARGE_MANIFEST_XML = ( - '\n' - "cmd
rpc
" - 'http://example.com/serverUDM' - "
" + '' + "cmdrpc" + "http://example.com/serverUDM" + "" ) diff --git a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py index 468f30779..c6232cd2b 100644 --- a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py +++ b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py @@ -34,6 +34,7 @@ from inbm_common_lib.exceptions import UrlSecurityException from .schedule.manifest_parser import ScheduleManifestParser, SCHEDULE_SCHEMA_LOCATION +from .schedule.rpc_activate_operation import RpcActivateOperation from .schedule.schedules import Schedule from .schedule.sqlite_manager import SqliteManager from .schedule.apscheduler import APScheduler @@ -260,6 +261,14 @@ def _perform_cmd_type_operation(self, parsed_head: XmlHandler, xml: str) -> Resu elif cmd == "query": self._dispatcher_broker.mqtt_publish(QUERY_CMD_CHANNEL, xml) return PUBLISH_SUCCESS + elif cmd == "rpc": + url = parsed_head.get_children('rpc')['fetch'] + name = parsed_head.get_children('rpc')['profileName'] + rpc_result = RpcActivateOperation().execute_rpc_activation_cmd(url, name) + if rpc_result == 'success': + self._send_result(message= "Successful RPC Activation Operation") + else: + self._send_result(message= "Failed RPC Activation Operation") elif cmd == "custom": header = parsed_head.get_children('custom') json_data = header['data'] diff --git a/inbm/dispatcher-agent/dispatcher/schedule/rpc_activate_operation.py b/inbm/dispatcher-agent/dispatcher/schedule/rpc_activate_operation.py new file mode 100644 index 000000000..c6c4ce773 --- /dev/null +++ b/inbm/dispatcher-agent/dispatcher/schedule/rpc_activate_operation.py @@ -0,0 +1,33 @@ +""" + Executes the AMT Rpc Activation command. + + Copyright (C) 2024 Intel Corporation + SPDX-License-Identifier: Apache-2.0 +""" +from typing import Optional +import shlex +from inbm_common_lib.shell_runner import PseudoShellRunner + + + +class RpcActivateOperation: + def __init__(self) -> None: + """RpcActivateOperation class is to execute rpc activate command + """ + pass + + def execute_rpc_activation_cmd(self, url, name) -> str: + """Executes the RPC activation command. + + @param url: url address of the RPS + @param name: profile name used for rpc configuration + @return: return 'success' if execution succeeds else returns 'Failure' + """ + url = shlex.quote(url) + name = shlex.quote(name) + command = f"rpc activate -u {url}/activate -n --profile {name}" + (out, err, code) = PseudoShellRunner().run(command) + if code == 0 and 'CIRA: Configured' in out: + return "Success" + else: + return "Failure" diff --git a/inbm/dispatcher-agent/tests/unit/schedule/test_rpc_activate_operation.py b/inbm/dispatcher-agent/tests/unit/schedule/test_rpc_activate_operation.py new file mode 100644 index 000000000..fbf91ece9 --- /dev/null +++ b/inbm/dispatcher-agent/tests/unit/schedule/test_rpc_activate_operation.py @@ -0,0 +1,28 @@ +""" + Execute Rpc Activation command functionality test. + + Copyright (C) 2024 Intel Corporation + SPDX-License-Identifier: Apache-2.0 +""" +from unittest import TestCase + +from unittest.mock import patch +from dispatcher.schedule.rpc_activate_operation import * + + +class TestRpcActivateOperation(TestCase): + + def setUp(self) -> None: + self.url = "wss://1.1.1.1" + self.profile_name = "profilename" + pass + + @patch("inbm_common_lib.shell_runner.PseudoShellRunner.run", return_value=('Success', "", 0)) + def test_execute_rpc_activation_cmd_success(self, mock_run) -> None: + rpc_result = RpcActivateOperation().execute_rpc_activation_cmd(self.url, self.profile_name) + self.assertEqual(rpc_result, 'Success') + + @patch("inbm_common_lib.shell_runner.PseudoShellRunner.run", return_value=('Failure', "", 0)) + def test_execute_rpc_activation_cmd_failure(self, mock_run) -> None: + rpc_result = RpcActivateOperation().execute_rpc_activation_cmd(self.url, self.profile_name) + self.assertEqual(rpc_result, 'Failure') \ No newline at end of file