diff --git a/inbm-lib/inbm_lib/constants.py b/inbm-lib/inbm_lib/constants.py index 60244c9bb..2fa9faf13 100644 --- a/inbm-lib/inbm_lib/constants.py +++ b/inbm-lib/inbm_lib/constants.py @@ -4,7 +4,7 @@ @license: SPDX-License-Identifier: Apache-2.0 """ from inbm_common_lib.utility import get_canonical_representation_of_path -from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_CACHE_PATH_PREFIX, LOG_PATH +from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_CACHE_PATH_PREFIX, LOG_PATH, INTEL_MANAGEABILITY_SHARE_PATH_PREFIX COMPOSE = 'compose' DOCKER = 'docker' @@ -83,6 +83,12 @@ PACKAGE_INSTALL = "install" PACKAGE_UPGRADE = "upgrade" +# Default config JSON schema location +CONFIG_JSON_SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX / + 'dispatcher-agent' / 'config_param_schema.json') +NODE_UPDATE_JSON_SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX / + 'dispatcher-agent' / 'node_update_schema.json') + # OTA STATUS OTA_SUCCESS = "SUCCESS" FAIL = "FAIL" diff --git a/inbm-lib/inbm_lib/json_validator.py b/inbm-lib/inbm_lib/json_validator.py new file mode 100644 index 000000000..2ec112e30 --- /dev/null +++ b/inbm-lib/inbm_lib/json_validator.py @@ -0,0 +1,61 @@ +""" + Copyright (C) 2017-2024 Intel Corporation + SPDX-License-Identifier: Apache-2.0 +""" + + +import json +import logging +import os +import jsonschema +from typing import Optional, Any + +from inbm_common_lib.utility import get_canonical_representation_of_path + +from .constants import CONFIG_JSON_SCHEMA_LOCATION + +logger = logging.getLogger(__name__) + + +def _get_schema_location(schema_location: Optional[str] = None) -> str: + if not schema_location: + schema_location = CONFIG_JSON_SCHEMA_LOCATION + return schema_location + +"""Validates JSON against a JSON schema + +@param params: JSON Parameters +@param schema_location: JSON schema location. Default=NONE +@return: Deserialized JSON +""" +def _validate_schema(params: str, schema_location: Optional[str] = None) -> Any: + schema_location = _get_schema_location(schema_location) + + if not os.path.exists(schema_location): + logger.error(f"JSON Schema file not found: {schema_location}") + raise ValueError("JSON Schema file not found: {schema_location}") + + try: + with open(get_canonical_representation_of_path(schema_location)) as schema_file: + schema = json.loads(schema_file.read()) + + parsed = json.loads(str(params)) + jsonschema.validate(parsed, schema) + except (ValueError, OSError, jsonschema.exceptions.ValidationError) as e: + raise ValueError(f"Schema validation failed! Error: {e}") + return parsed + + +def is_valid_json_structure(json_params: str, schema_location: Optional[str] = None) -> bool: + """Validate the JSON structure against the schema + + @param json_params: JSON params to be validated + @param schema_location: location of schema file; default=None + @return (bool): True if valid schema; otherwise, False + """ + try: + _validate_schema(json_params, schema_location) + except (ValueError, KeyError, jsonschema.exceptions.ValidationError) as e: + logger.info("Error validating JSON structure against schema: %s", str(e)) + return False + return True diff --git a/inbm-lib/setup.py b/inbm-lib/setup.py index e50819715..4d4c6f071 100644 --- a/inbm-lib/setup.py +++ b/inbm-lib/setup.py @@ -15,7 +15,7 @@ license='Intel Proprietary', packages=['inbm_lib', 'inbm_common_lib'], include_package_data=True, - install_requires=['paho-mqtt==1.6.0', 'types-paho-mqtt==1.6.0.7', 'xmlschema==1.5.3', 'defusedxml==0.7.1', 'url-normalize==1.4.3', 'snoop==0.4.3', 'types-setuptools==71.1.0.20240813'], + install_requires=['paho-mqtt==1.6.0', 'types-paho-mqtt==1.6.0.7', 'xmlschema==1.5.3', 'defusedxml==0.7.1', 'url-normalize==1.4.3', 'snoop==0.4.3', 'types-setuptools==71.1.0.20240813', 'jsonschema==4.20.0', 'types-jsonschema==4.20.0.0'], test_suite='pytest', tests_require=test_deps, extras_require=extras, diff --git a/inbm-lib/tests/unit/inbm_lib/test_json_validator.py b/inbm-lib/tests/unit/inbm_lib/test_json_validator.py new file mode 100644 index 000000000..e5b9e4d60 --- /dev/null +++ b/inbm-lib/tests/unit/inbm_lib/test_json_validator.py @@ -0,0 +1,80 @@ +import os +import unittest +from unittest import TestCase + +from inbm_lib.json_validator import is_valid_json_structure, _get_schema_location + +TEST_CONFIG_JSON_SCHEMA_LOCATION = os.path.join( + os.path.dirname(__file__), + '..', + '..', + '..', + '..', + 'inbm', + 'dispatcher-agent', + 'fpm-template', + 'usr', + 'share', + 'dispatcher-agent', + 'config_param_schema.json', + ) + +TEST_NODE_UPDATE_JSON_SCHEMA_LOCATION = os.path.join( + os.path.dirname(__file__), + '..', + '..', + '..', + '..', + 'inbm', + 'dispatcher-agent', + 'fpm-template', + 'usr', + 'share', + 'dispatcher-agent', + 'node_update_schema.json', + ) + + +class TestJsonValidator(TestCase): + + def test_validate_node_update_json_structure_pass(self) -> None: + json_params = '{"status":200, "message":"COMMAND SUCCESSFUL", "jobId":"swupd-4b151b70-c121-4245-873b-5324ac7a3f7a"}' + result = is_valid_json_structure(json_params, TEST_NODE_UPDATE_JSON_SCHEMA_LOCATION) + self.assertTrue(result is True) + + def test_json_parse_one_param_pass(self) -> None: + config_params = '{"execcmd":"abc"}' + result = is_valid_json_structure(config_params, TEST_CONFIG_JSON_SCHEMA_LOCATION) + self.assertTrue(result is True) + + def test_json_parse_two_param_pass(self) -> None: + config_params = '{"execcmd":"abc", "device":["abcd","def"]}' + result = is_valid_json_structure(config_params, TEST_CONFIG_JSON_SCHEMA_LOCATION) + self.assertTrue(result is True) + + def test_json_parse_param_fail(self) -> None: + config_params = '{"privileged":["yes"]}' + result = is_valid_json_structure(config_params, TEST_CONFIG_JSON_SCHEMA_LOCATION) + self.assertTrue(result is False) + + def test_json_parse_no_param_fail(self) -> None: + config_params = '' + result = is_valid_json_structure(config_params, TEST_CONFIG_JSON_SCHEMA_LOCATION) + self.assertTrue(result is False) + + def test_json_wrong_schema_location(self) -> None: + config_params = '' + result = is_valid_json_structure(config_params, '') + self.assertTrue(result is False) + + def test_get_schema_returns_passed_schema(self) -> None: + self.assertEqual(_get_schema_location( + 'test_schema.json'), 'test_schema.json') + + def test_get_schema_returns_single_schema(self) -> None: + self.assertEqual(_get_schema_location(), + '/usr/share/dispatcher-agent/config_param_schema.json') + + +if __name__ == '__main__': + unittest.main() diff --git a/inbm/Changelog.md b/inbm/Changelog.md index 1a308d10e..7b140ea67 100644 --- a/inbm/Changelog.md +++ b/inbm/Changelog.md @@ -9,8 +9,9 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). - (NEXMANAGE-598) Expanding INBC for handling TiberOS update cmd - Updated proto files to add new RPC calls to allow edge node to update its status with INBS. - - (NEXMANAGE- 610) Add functionality to INBM Cloudadapter-agent to support OOB AMT RPC command requests from INBS + - (NEXMANAGE-610) Add functionality to INBM Cloudadapter-agent to support OOB AMT RPC command requests from INBS - Update TiberOS name to "tiber" + - (NEXMANAGE-613) Store Scheduled updates in DB, Add nodeUpdate communication stream, and plumbing to return correct jobID on scheduled request. ### Changed - (NEXARL-306) Update agents' prerm script to prevent them from disabling and stopping if it's an upgrade process diff --git a/inbm/cloudadapter-agent/cloudadapter/client.py b/inbm/cloudadapter-agent/cloudadapter/client.py index 0800b1fdc..d781af40b 100644 --- a/inbm/cloudadapter-agent/cloudadapter/client.py +++ b/inbm/cloudadapter-agent/cloudadapter/client.py @@ -167,7 +167,6 @@ def stop(self) -> None: logger.debug("Stopping cloudadapter client") self._broker.stop() self._cloud_publisher.publish_event("Disconnected") - self._cloud_publisher.publish_update("Disconnected") try: logger.debug("Calling disconnect on adapter") self._adapter.disconnect() diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py b/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py index cdd7d4739..fa3633783 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py @@ -7,6 +7,7 @@ import json import queue import random +import logging import threading from google.protobuf.timestamp_pb2 import Timestamp from typing import Callable, Optional @@ -15,11 +16,12 @@ from cloudadapter.cloud.adapters.inbs.operation import ( convert_updated_scheduled_operations_to_dispatcher_xml, ) -from cloudadapter.constants import METHOD, DEAD +from cloudadapter.constants import METHOD, DEAD, NODE_UPDATE_JSON_SCHEMA_LOCATION from cloudadapter.exceptions import AuthenticationError, PublishError from cloudadapter.pb.inbs.v1 import inbs_sb_pb2_grpc, inbs_sb_pb2 from cloudadapter.pb.common.v1 import common_pb2 -import logging + +from inbm_lib.json_validator import is_valid_json_structure import grpc # type: ignore from .cloud_client import CloudClient @@ -117,18 +119,24 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None: def publish_update(self, key: str, value: str) -> None: """Publishes an update to the cloud - @param message: node update message to publish + @param key: key to publish + @param value: node update message to publish @exception PublishError: If publish fails """ if self._grpc_channel is None: raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_update") + is_valid = is_valid_json_structure(value, NODE_UPDATE_JSON_SCHEMA_LOCATION) + if not is_valid: + logger.error(f"JSON schema validation failed while verifying node_update message: {value}") + return + # Turn the message into a dict logger.debug(f"Received node update: key={key}, value={value}") try: message_dict = json.loads(value) except json.JSONDecodeError as e: - logger.error(f"Cannot convert formatted message to dict: {value}. Error: {e}") + logger.error(f"Cannot convert node update formatted message to a dict type. message={value} error={e}") return status_code=message_dict.get("status", "") @@ -141,7 +149,7 @@ def publish_update(self, key: str, value: str) -> None: timestamp = Timestamp() timestamp.GetCurrentTime() job=common_pb2.Job( - job_id=message_dict.get("job_id", ""), + job_id=message_dict.get("jobId", ""), node_id=self._client_id, status_code=status_code, result_msgs=result_messages, @@ -149,6 +157,7 @@ def publish_update(self, key: str, value: str) -> None: job_state=job_state ) + request = inbs_sb_pb2.SendNodeUpdateRequest( request_id="notused", job_update=job, diff --git a/inbm/cloudadapter-agent/cloudadapter/constants.py b/inbm/cloudadapter-agent/cloudadapter/constants.py index a31319bfd..5ccc086fd 100644 --- a/inbm/cloudadapter-agent/cloudadapter/constants.py +++ b/inbm/cloudadapter-agent/cloudadapter/constants.py @@ -122,3 +122,6 @@ class METHOD: # The system path to the JSON schema GENERIC_SCHEMA_PATH = INTEL_MANAGEABILITY_SHARE_PATH_PREFIX / \ 'cloudadapter-agent' / 'config_schema.json' + +NODE_UPDATE_JSON_SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX / + 'cloudadapter-agent' / 'node_update_schema.json') diff --git a/inbm/cloudadapter-agent/fpm-template/etc/apparmor.d/usr.bin.inbm-cloudadapter b/inbm/cloudadapter-agent/fpm-template/etc/apparmor.d/usr.bin.inbm-cloudadapter index e3eede2a4..5d8ef1e07 100644 --- a/inbm/cloudadapter-agent/fpm-template/etc/apparmor.d/usr.bin.inbm-cloudadapter +++ b/inbm/cloudadapter-agent/fpm-template/etc/apparmor.d/usr.bin.inbm-cloudadapter @@ -31,6 +31,7 @@ /etc/intel-manageability/secret/cloudadapter-agent/** r, /etc/intel-manageability/public/ucc-ca/ r, /usr/share/cloudadapter-agent/config_schema.json r, + /usr/share/cloudadapter-agent/node_update_schema.json r, /etc/intel-manageability/public/cloudadapter-agent/device_id rw, /etc/intel-manageability/public/mqtt-ca/mqtt-ca.crt r, /etc/intel-manageability/public/ucc-ca/ucc.ca.pem.crt r, diff --git a/inbm/cloudadapter-agent/fpm-template/usr/share/cloudadapter-agent/node_update_schema.json b/inbm/cloudadapter-agent/fpm-template/usr/share/cloudadapter-agent/node_update_schema.json new file mode 100644 index 000000000..fd2b0a451 --- /dev/null +++ b/inbm/cloudadapter-agent/fpm-template/usr/share/cloudadapter-agent/node_update_schema.json @@ -0,0 +1,21 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "status": { + "type": "integer", + "description": "The HTTP status code of the response." + }, + "message": { + "type": "string", + "description": "A message describing the result of the command." + }, + "jobId": { + "type": "string", + "description": "A unique identifier for the job.", + "pattern": "^[a-z0-9-]+$" + } + }, + "required": ["status", "message", "jobId"], + "additionalProperties": false +} \ No newline at end of file diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py b/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py index dce7bdd3d..7589a1651 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py @@ -57,15 +57,17 @@ def test_publish_update(self, inbs_client: InbsCloudClient) -> None: mock_channel.SendNodeUpdateRequest.return_value = "MockResponse" inbs_client._grpc_channel = mock_channel - key = 'test-key' - value = '{"job_id": "12345", "status": 200, "message": "Update successful"}' + key = 'update' + value = '{"status":200, "message":"COMMAND SUCCESSFUL", "jobId":"swupd-4b151b70-c121-4245-873b-5324ac7a3f7a"}' # Call the publish_update method - inbs_client.publish_update(key, value) + with patch('cloudadapter.cloud.client.inbs_cloud_client.is_valid_json_structure', return_value=True): + inbs_client.publish_update(key, value) # Assert that the gRPC channel's SendNodeUpdate method was called mock_channel.SendNodeUpdate.assert_called_once() - + + def test_publish_update_failure_no_grpc_channel(self, inbs_client: InbsCloudClient): # Ensure that _grpc_channel is None to simulate the channel not being set up inbs_client._grpc_channel = None diff --git a/inbm/dispatcher-agent/dispatcher/constants.py b/inbm/dispatcher-agent/dispatcher/constants.py index bb0ef7d81..16c7f83fe 100644 --- a/inbm/dispatcher-agent/dispatcher/constants.py +++ b/inbm/dispatcher-agent/dispatcher/constants.py @@ -51,8 +51,6 @@ # Schema location SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX / 'dispatcher-agent' / 'manifest_schema.xsd') -JSON_SCHEMA_LOCATION = str(INTEL_MANAGEABILITY_SHARE_PATH_PREFIX / - 'dispatcher-agent' / 'config_param_schema.json') # Client certs and keys path CLIENT_CERTS = str(BROKER_ETC_PATH / 'public' / diff --git a/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py b/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py index 3e4005fbd..733f07da5 100644 --- a/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py +++ b/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py @@ -15,6 +15,8 @@ from dispatcher.dispatcher_exception import DispatcherException from inbm_lib.mqttclient.config import DEFAULT_MQTT_HOST, DEFAULT_MQTT_PORT, MQTT_KEEPALIVE_INTERVAL from inbm_lib.mqttclient.mqtt import MQTT +from inbm_lib.json_validator import is_valid_json_structure +from inbm_lib.constants import NODE_UPDATE_JSON_SCHEMA_LOCATION from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL, UPDATE_CHANNEL @@ -48,19 +50,19 @@ def send_update(self, message: str) -> None: """ logger.debug(f"Sending node update for to {UPDATE_CHANNEL} with message: {message}") self.mqtt_publish(topic=UPDATE_CHANNEL, payload=message) - + def _check_db_for_started_job(self) -> Optional[Schedule]: sqliteMgr = SqliteManager() schedule = sqliteMgr.get_any_started_schedule() logger.debug(f"Checking for started schedule in DB: schedule={schedule}") - if schedule: + if schedule: # Change status to COMPLETED sqliteMgr.update_status(schedule, COMPLETED) del sqliteMgr return schedule - def send_result(self, message: str, request_id: str = "", job_id: str = "") -> None: # pragma: no cover + def send_result(self, message: str, request_id: str = "") -> None: # pragma: no cover """Sends result to local MQTT channel Raises ValueError if request_id contains a slash @@ -81,14 +83,8 @@ def send_result(self, message: str, request_id: str = "", job_id: str = "") -> N logger.error('Cannot send result: dispatcher core not initialized') return - schedule = None - # Check if this is a request stored in the DB and started from the APScheduler - if job_id != "": - schedule = Schedule(request_id=request_id, job_id=job_id) - else: - # Some jobs do not call send_result to the dispatcher class to get the - # job_id. In this case, we need to check the DB for the job_id. - schedule = self._check_db_for_started_job() + schedule = self._check_db_for_started_job() + logger.debug(f"Schedule in Broker Send_result: {schedule}") if not schedule: # This is not a scheduled job @@ -107,20 +103,23 @@ def send_result(self, message: str, request_id: str = "", job_id: str = "") -> N # Turn the message into a dict message_dict = json.loads(message) except json.JSONDecodeError as e: - logger.error(f"Cannot convert formatted message to dict: {message}. Error: {e}") - self.send_update(str(message)) + logger.error(f"Cannot convert node update formatted message to a dict type. message={message} error={e}") return # Update the job_id in the message - message_dict['job_id'] = schedule.job_id - + message_dict['jobId'] = schedule.job_id + # Convert the updated message_dict back to a JSON string try: updated_message = json.dumps(message_dict) except (TypeError, OverflowError) as e: logger.error(f"Cannot convert Result back to string: {message_dict}. Error: {e}") - self.send_update(str(message)) - return + return + + is_valid = is_valid_json_structure(updated_message, NODE_UPDATE_JSON_SCHEMA_LOCATION) + if not is_valid: + logger.error(f"JSON schema validation failed while verifying node_update message: {updated_message}") + return logger.debug(f"Sending node update message: {str(updated_message)}") self.send_update(str(updated_message)) diff --git a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py index 072a9c458..302362088 100644 --- a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py +++ b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py @@ -274,7 +274,7 @@ def _perform_cmd_type_operation(self, parsed_head: XmlHandler, xml: str) -> Resu def _telemetry(self, message: str) -> None: self._dispatcher_broker.telemetry(message) - def _send_result(self, message: str, request_id: str = "", job_id: str = "") -> None: + def _send_result(self, message: str, request_id: str = "") -> None: """Sends result message to local MQTT channel If request_id is specified, the message is sent to RESPONSE_CHANNEL/id instead of RESPONSE_CHANNEL @@ -284,8 +284,8 @@ def _send_result(self, message: str, request_id: str = "", job_id: str = "") -> @param message: message to be published to cloud """ # Check if this is a request stored in the DB and started from the APScheduler - logger.debug(f"Sending result message with id {request_id}: {message}") - self._dispatcher_broker.send_result(message, request_id, job_id) + logger.debug(f"Sending result message with request_id={request_id}, message={message}") + self._dispatcher_broker.send_result(message, request_id) def run_scheduled_job(self, schedule: Schedule, manifest: str) -> None: @@ -374,7 +374,7 @@ def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: st self._update_logger.error = str(e) finally: logger.info('Install result: %s', str(result)) - self._send_result(message=str(result), job_id=job_id) + self._send_result(message=str(result)) if result.status != CODE_OK and parsed_head: self._update_logger.status = FAIL self._update_logger.error = str(result) diff --git a/inbm/dispatcher-agent/dispatcher/ota_parser.py b/inbm/dispatcher-agent/dispatcher/ota_parser.py index 97f272c66..a3fd2421d 100644 --- a/inbm/dispatcher-agent/dispatcher/ota_parser.py +++ b/inbm/dispatcher-agent/dispatcher/ota_parser.py @@ -12,10 +12,10 @@ from .constants import OtaType from .common.uri_utilities import is_valid_uri -from .validators import is_valid_config_params from .dispatcher_exception import DispatcherException from inbm_lib.xmlhandler import XmlException from inbm_lib.xmlhandler import XmlHandler +from inbm_lib.json_validator import is_valid_json_structure from inbm_common_lib.constants import DEFAULT_HASH_ALGORITHM, LOCAL_SOURCE logger = logging.getLogger(__name__) @@ -171,7 +171,7 @@ def parse(self, resource: Dict, kwargs: Dict, parsed: XmlHandler) -> Dict[str, A if 'import' in cmd: config_params = '{"execcmd":"/bin/true"}' - if config_params and not is_valid_config_params(config_params): + if config_params and not is_valid_json_structure(config_params): logger.info("Config Params not passed correctly" " in manifest, rejected update") raise XmlException diff --git a/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py b/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py index 6c5d9bad7..e7eca962d 100644 --- a/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py +++ b/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py @@ -106,26 +106,52 @@ def get_any_started_schedule(self) -> Optional[Schedule]: sql = ''' SELECT j.job_id, j.task_id, - sj.schedule_id, - sj.schedule_type, - COALESCE(iss.request_id, sss.request_id, rss.request_id) AS request_id + isj.schedule_id, + 'immediate' AS schedule_type, + imm.request_id FROM job j JOIN - ( - SELECT task_id, schedule_id, 'Immediate' AS schedule_type FROM immediate_schedule_job WHERE status = 'started' - UNION ALL - SELECT task_id, schedule_id, 'Single' AS schedule_type FROM single_schedule_job WHERE status = 'started' - UNION ALL - SELECT task_id, schedule_id, 'Repeated' AS schedule_type FROM repeated_schedule_job WHERE status = 'started' - ) sj ON j.task_id = sj.task_id - LEFT JOIN - immediate_schedule iss ON sj.schedule_id = iss.id AND sj.schedule_type = 'Immediate' - LEFT JOIN - single_schedule sss ON sj.schedule_id = sss.id AND sj.schedule_type = 'Single' - LEFT JOIN - repeated_schedule rss ON sj.schedule_id = rss.id AND sj.schedule_type = 'Repeated' - ''' + immediate_schedule_job isj ON j.task_id = isj.task_id + JOIN + immediate_schedule imm ON isj.schedule_id = imm.id + WHERE + isj.status = 'started' + + UNION + + SELECT + j.job_id, + j.task_id, + ssj.schedule_id, + 'single' AS schedule_type, + ss.request_id + FROM + job j + JOIN + single_schedule_job ssj ON j.task_id = ssj.task_id + JOIN + single_schedule ss ON ssj.schedule_id = ss.id + WHERE + ssj.status = 'started' + + UNION + + SELECT + j.job_id, + j.task_id, + rsj.schedule_id, + 'repeated' AS schedule_type, + rs.request_id + FROM + job j + JOIN + repeated_schedule_job rsj ON j.task_id = rsj.task_id + JOIN + repeated_schedule rs ON rsj.schedule_id = rs.id + WHERE + rsj.status = 'started'; + ''' cursor = self._conn.cursor() try: @@ -141,9 +167,9 @@ def get_any_started_schedule(self) -> Optional[Schedule]: request_id = row[0][4] logger.debug(f"Schedule in 'STARTED' state has type={schedule_type}, jobID={job_id}, taskID={task_id}, scheduleID={schedule_id}, requestID={request_id}") - if schedule_type == 'Immediate': + if schedule_type.lower() == 'immediate': return SingleSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id) - elif schedule_type == 'Single': + elif schedule_type.lower() == 'single': return SingleSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id, start_time=datetime.now()) else: return RepeatedSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id) diff --git a/inbm/dispatcher-agent/dispatcher/sota/sota.py b/inbm/dispatcher-agent/dispatcher/sota/sota.py index 8a562fdfa..8c1d32eae 100644 --- a/inbm/dispatcher-agent/dispatcher/sota/sota.py +++ b/inbm/dispatcher-agent/dispatcher/sota/sota.py @@ -347,7 +347,7 @@ def execute_from_manifest(self, sota_cache_repo.delete_all() # clean cache directory if get_command_status(cmd_list) == SUCCESS: self._dispatcher_broker.send_result( - '{"status": 200, "message": SOTA command status: SUCCESSFUL"}') + '{"status": 200, "message": "SOTA command status: SUCCESSFUL"}') success = True else: self._dispatcher_broker.telemetry( diff --git a/inbm/dispatcher-agent/dispatcher/validators.py b/inbm/dispatcher-agent/dispatcher/validators.py deleted file mode 100644 index c8b7cb23c..000000000 --- a/inbm/dispatcher-agent/dispatcher/validators.py +++ /dev/null @@ -1,56 +0,0 @@ -""" - Copyright (C) 2017-2024 Intel Corporation - SPDX-License-Identifier: Apache-2.0 -""" - - -import json -import logging -import os -import jsonschema -from typing import Optional - -from inbm_common_lib.utility import get_canonical_representation_of_path - -from .constants import * - -logger = logging.getLogger(__name__) - - -def get_schema_location(schema_type: str, schema_location: Optional[str] = None) -> str: - if not schema_location: - schema_location = JSON_SCHEMA_LOCATION - return schema_location - - -def validate_schema(schema_type: str, params: str, schema_location: Optional[str] = None) -> str: - schema_location = get_schema_location(schema_type, schema_location) - - if not os.path.exists(schema_location): - logger.error("JSON Schema file not found") - raise ValueError("JSON Schema file not found") - - try: - with open(get_canonical_representation_of_path(schema_location)) as schema_file: - schema = json.loads(schema_file.read()) - - parsed = json.loads(str(params)) - jsonschema.validate(parsed, schema) - except (ValueError, OSError, jsonschema.exceptions.ValidationError): - raise ValueError("Schema validation failed!") - return parsed - - -def is_valid_config_params(config_params: str, schema_location: Optional[str] = None) -> bool: - """Schema validate the configuration parameters - - @param config_params: params to be validated - @param schema_location: location of schema file; default=None - @return (bool): True if schema validated or False on failure or exception - """ - try: - validate_schema('single', config_params, schema_location) - except (ValueError, KeyError, jsonschema.exceptions.ValidationError) as e: - logger.info("Error received: %s", str(e)) - return False - return True diff --git a/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/node_update_schema.json b/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/node_update_schema.json new file mode 100644 index 000000000..fd2b0a451 --- /dev/null +++ b/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/node_update_schema.json @@ -0,0 +1,21 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "status": { + "type": "integer", + "description": "The HTTP status code of the response." + }, + "message": { + "type": "string", + "description": "A message describing the result of the command." + }, + "jobId": { + "type": "string", + "description": "A unique identifier for the job.", + "pattern": "^[a-z0-9-]+$" + } + }, + "required": ["status", "message", "jobId"], + "additionalProperties": false +} \ No newline at end of file diff --git a/inbm/dispatcher-agent/tests/unit/aota/test_factory.py b/inbm/dispatcher-agent/tests/unit/aota/test_factory.py index 33ce157ea..e2ad56bdc 100644 --- a/inbm/dispatcher-agent/tests/unit/aota/test_factory.py +++ b/inbm/dispatcher-agent/tests/unit/aota/test_factory.py @@ -1,6 +1,6 @@ from unittest import TestCase from unit.common.mock_resources import * -from unittest.mock import patch, Mock +from unittest.mock import patch from dispatcher.aota.factory import get_app_instance, get_app_os from dispatcher.aota.aota_command import DockerCompose, Docker @@ -8,7 +8,6 @@ from dispatcher.aota.aota_error import AotaError from dispatcher.aota.application_command import CentOsApplication, UbuntuApplication -from .test_aota_command import TestAotaCommand DOCKER_COMPOSE_PARSED_MANIFEST = {'config_params': None, 'version': None, 'container_tag': 'abc', 'uri': 'http://sample/test.tar.gz', diff --git a/inbm/dispatcher-agent/tests/unit/common/mock_resources.py b/inbm/dispatcher-agent/tests/unit/common/mock_resources.py index 9a5df191b..76ff389e7 100644 --- a/inbm/dispatcher-agent/tests/unit/common/mock_resources.py +++ b/inbm/dispatcher-agent/tests/unit/common/mock_resources.py @@ -289,7 +289,7 @@ def __init__(self) -> None: def start(self, tls: bool) -> None: pass - def send_result(self, message: str, id: str = "", job_id: str = "") -> None: + def send_result(self, message: str, id: str = "") -> None: pass def mqtt_publish(self, topic: str, payload: Any, qos: int = 0, retain: bool = False) -> None: diff --git a/inbm/dispatcher-agent/tests/unit/config/test_config_operation.py b/inbm/dispatcher-agent/tests/unit/config/test_config_operation.py index 46be1f234..4a349cc74 100644 --- a/inbm/dispatcher-agent/tests/unit/config/test_config_operation.py +++ b/inbm/dispatcher-agent/tests/unit/config/test_config_operation.py @@ -3,6 +3,7 @@ from dispatcher.common.result_constants import CONFIG_LOAD_FAIL_WRONG_PATH, CONFIG_LOAD_SUCCESS from dispatcher.config.config_operation import ConfigOperation from inbm_lib.xmlhandler import XmlHandler +from unittest.mock import MagicMock, patch TEST_SCHEMA_LOCATION = os.path.join(os.path.dirname(__file__), '../../../fpm-template/usr/share/dispatcher-agent/' @@ -10,6 +11,10 @@ # Parameterize the test function to run it with different inputs and expected outputs +@pytest.fixture +def config_operation(): + mock_dispatcher_broker = MagicMock() + return ConfigOperation(mock_dispatcher_broker) @pytest.mark.parametrize( "xml_path, expected_result", @@ -18,12 +23,11 @@ ('/var/cache/abc/intel.conf', CONFIG_LOAD_FAIL_WRONG_PATH), ] ) -def test_config_load_operation(mocker, xml_path, expected_result): +def test_config_load_operation(mocker, config_operation, xml_path, expected_result): xml = f'config' \ f'load{xml_path}' \ f' ' parsed_head = XmlHandler(xml, is_file=False, schema_location=TEST_SCHEMA_LOCATION) - c = ConfigOperation(dispatcher_broker=mocker.Mock()) mock_download = mocker.patch( 'dispatcher.configuration_helper.ConfigurationHelper.download_config') @@ -37,4 +41,35 @@ def test_config_load_operation(mocker, xml_path, expected_result): mock_req_conf_func.assert_not_called() # Run the test and check the result - assert expected_result == c._do_config_install_load(parsed_head=parsed_head, xml=xml) + assert expected_result == config_operation._do_config_install_load(parsed_head=parsed_head, xml=xml) + +def test_do_config_install_update_config_items_append(monkeypatch, config_operation): + # Define the command type and value object for the test + config_cmd_type = 'append' + value_object = 'trustedRepositories' + + mock_request_config_agent = MagicMock() + monkeypatch.setattr(config_operation, 'request_config_agent', mock_request_config_agent) + + result = config_operation._do_config_install_update_config_items(config_cmd_type, value_object) + + # Assert that the request_config_agent method was called with the correct arguments + mock_request_config_agent.assert_called_once_with(config_cmd_type, file_path=None, value_string=value_object) + + # Assert that the result is a success + assert result.status == 200 + assert result.message == 'Configuration append command: SUCCESSFUL' + +def test_do_config_install_update_config_items_remove_failure(monkeypatch, config_operation): + # Define the command type and value object for the test + config_cmd_type = 'remove' + value_object = 'invalid_config_path' + + mock_request_config_agent = MagicMock() + monkeypatch.setattr(config_operation, 'request_config_agent', mock_request_config_agent) + + result = config_operation._do_config_install_update_config_items(config_cmd_type, value_object) + + # Assert that the result is a success + assert result.status == 400 + assert 'FAILED' in result.message diff --git a/inbm/dispatcher-agent/tests/unit/packagemanager/test_package_manager.py b/inbm/dispatcher-agent/tests/unit/packagemanager/test_package_manager.py index 2089ca1d5..b313c1619 100644 --- a/inbm/dispatcher-agent/tests/unit/packagemanager/test_package_manager.py +++ b/inbm/dispatcher-agent/tests/unit/packagemanager/test_package_manager.py @@ -8,13 +8,13 @@ from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import rsa -from unittest.mock import patch +from unittest.mock import patch, mock_open, MagicMock from tarfile import TarFile from dispatcher.dispatcher_exception import DispatcherException from dispatcher.packagemanager import package_manager from dispatcher.packagemanager.memory_repo import MemoryRepo -from dispatcher.packagemanager.package_manager import extract_files_from_tar, \ +from dispatcher.packagemanager.package_manager import extract_files_from_tar, DispatcherBroker, \ _get_checksum, get_file_type, verify_signature, verify_source, _get_ext, \ _is_valid_file, _verify_checksum_with_key, _is_source_match_trusted_repo, _parse_config_result from dispatcher.common.result_constants import Result @@ -148,6 +148,50 @@ def test_checksum_none(self, mock_checksum, mock_open, mock_valid_file, mockup) verify_signature( "signature", "path/to/file.tar", MockDispatcherBroker.build_mock_dispatcher_broker(), 384) + @patch('dispatcher.packagemanager.package_manager._verify_checksum_with_key') + @patch('dispatcher.packagemanager.package_manager._get_checksum') + @patch("dispatcher.packagemanager.package_manager.extract_files_from_tar") + @patch('dispatcher.packagemanager.package_manager.load_pem_x509_certificate') + @patch('builtins.open', new_callable=mock_open, read_data='file content') + def test_verify_signature_success(self, mock_file, mock_load_cert, mock_extract, + mock_checksum, mock_verify) -> None: + files = [MockFile('x'), MockFile('y')] + mock_extract.return_value = files, MockTar('tar') + + mock_checksum.return_value = hashlib.sha384(b'abc').hexdigest() + + # Mock the DispatcherBroker + mock_broker = MagicMock(spec=DispatcherBroker) + mock_broker.telemetry = MagicMock() + + # Mock the load_pem_x509_certificate function to return a mock certificate object + mock_cert = MagicMock() + mock_load_cert.return_value = mock_cert + + # Mock the public_key method on the mock certificate object to return a mock public key + mock_public_key = MagicMock() + mock_cert.public_key.return_value = mock_public_key + + # Mock the verify method on the mock public key to do nothing (successful verification) + mock_public_key.verify = MagicMock() + + # Define the signature, path to file, and hash algorithm for the test + signature = 'signature' + path_to_file = '/path/to/package.tar' + hash_algorithm = 384 + + # Call the verify_signature function + verify_signature(signature, path_to_file, mock_broker, hash_algorithm) + + # Assert that the certificate was loaded + mock_load_cert.assert_called_once() + + # Assert that the public key was used to verify the signature + mock_verify.assert_called_once() + + # Assert that the telemetry method was called with the success message + mock_broker.telemetry.assert_called_with('Signature check passed.') + @patch("dispatcher.packagemanager.package_manager.extract_files_from_tar") def test_verify_signature_cert_package_not_found(self, mockup) -> None: files = [MockFile('x'), MockFile('y')] diff --git a/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py b/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py index fbf8f1713..fac3691f3 100644 --- a/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py +++ b/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py @@ -117,7 +117,21 @@ def test_update_single_schedule_status_to_scheduled(db_connection: SqliteManager results = db_connection.get_single_schedules_in_priority_order() assert len(results) == 0 -def test_update_repeated_schedule_statu_to_scheduled(db_connection: SqliteManager): +def test_get_started_immediate_scheduled_job(db_connection: SqliteManager): + iss = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + manifests=["MANIFEST1"]) + db_connection.clear_database() + + db_connection.create_schedule(iss) + # SQL call only gets results that don't have a status. + results = db_connection.get_immediate_schedules_in_priority_order() + assert len(results) == 1 + db_connection.update_status(results[0], "started") + imm_sched = db_connection.get_any_started_schedule() + assert imm_sched == SingleSchedule(request_id=REQUEST_ID, job_id=JOB_ID, task_id=1, schedule_id=1) + +def test_update_repeated_schedule_status_to_scheduled(db_connection: SqliteManager): rs = RepeatedSchedule(request_id=REQUEST_ID, job_id=JOB_ID, cron_duration="P7D", cron_minutes="0", diff --git a/inbm/dispatcher-agent/tests/unit/test_validators.py b/inbm/dispatcher-agent/tests/unit/test_validators.py deleted file mode 100644 index a19ed8151..000000000 --- a/inbm/dispatcher-agent/tests/unit/test_validators.py +++ /dev/null @@ -1,49 +0,0 @@ -import os -import unittest -from unittest import TestCase - -from dispatcher.validators import is_valid_config_params, get_schema_location - -TEST_JSON_SCHEMA_LOCATION = os.path.join(os.path.dirname(__file__), - '../../fpm-template/usr/share/dispatcher-agent/' - 'config_param_schema.json') - - -class TestValidators(TestCase): - - def test_json_parse_one_param_pass(self) -> None: - config_params = '{"execcmd":"abc"}' - result = is_valid_config_params(config_params, TEST_JSON_SCHEMA_LOCATION) - self.assertTrue(result is True) - - def test_json_parse_two_param_pass(self) -> None: - config_params = '{"execcmd":"abc", "device":["abcd","def"]}' - result = is_valid_config_params(config_params, TEST_JSON_SCHEMA_LOCATION) - self.assertTrue(result is True) - - def test_json_parse_param_fail(self) -> None: - config_params = '{"privileged":["yes"]}' - result = is_valid_config_params(config_params, TEST_JSON_SCHEMA_LOCATION) - self.assertTrue(result is False) - - def test_json_parse_no_param_fail(self) -> None: - config_params = '' - result = is_valid_config_params(config_params, TEST_JSON_SCHEMA_LOCATION) - self.assertTrue(result is False) - - def test_json_wrong_schema_location(self) -> None: - config_params = '' - result = is_valid_config_params(config_params, '') - self.assertTrue(result is False) - - def test_get_schema_returns_passed_schema(self) -> None: - self.assertEqual(get_schema_location( - 'single', 'test_schema.json'), 'test_schema.json') - - def test_get_schema_returns_single_schema(self) -> None: - self.assertEqual(get_schema_location('single'), - '/usr/share/dispatcher-agent/config_param_schema.json') - - -if __name__ == '__main__': - unittest.main() diff --git a/inbm/dockerfiles/Dockerfile-check.m4 b/inbm/dockerfiles/Dockerfile-check.m4 index 9d3b222c4..42282e6cb 100644 --- a/inbm/dockerfiles/Dockerfile-check.m4 +++ b/inbm/dockerfiles/Dockerfile-check.m4 @@ -58,7 +58,9 @@ RUN source /venv-py3/bin/activate && \ FROM venv-py3 AS test-inbm-lib WORKDIR /src/inbm-lib # for unit test -COPY inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd /src/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd +COPY inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd /src/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd +COPY inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/node_update_schema.json /src/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/node_update_schema.json +COPY inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/config_param_schema.json /src/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/config_param_schema.json RUN source /venv-py3/bin/activate && \ cd /src/inbm-lib && \ set -o pipefail && \ diff --git a/inbm/packaging/yocto/common/usr.bin.inbm-cloudadapter b/inbm/packaging/yocto/common/usr.bin.inbm-cloudadapter index c83563fc7..10e0341ec 100644 --- a/inbm/packaging/yocto/common/usr.bin.inbm-cloudadapter +++ b/inbm/packaging/yocto/common/usr.bin.inbm-cloudadapter @@ -37,6 +37,7 @@ /etc/intel-manageability/public/cloudadapter-agent/** r, /etc/intel-manageability/secret/cloudadapter-agent/** r, /usr/share/cloudadapter-agent/config_schema.json r, + /usr/share/cloudadapter-agent/node_update_schema.json r, /etc/intel-manageability/public/cloudadapter-agent/device_id rw, /etc/intel-manageability/public/mqtt-ca/mqtt-ca.crt r, /var/tmp/* rw,