diff --git a/docs/Dispatcher Scheduling DB Design.md b/docs/Dispatcher Scheduling DB Design.md index 072cd4f58..03219c8ea 100644 --- a/docs/Dispatcher Scheduling DB Design.md +++ b/docs/Dispatcher Scheduling DB Design.md @@ -37,14 +37,20 @@ erDiagram TEXT status "NULL or scheduled" } + IMMEDIATE_SCHEDULE { + INTEGER id PK "AUTOINCREMENT" + TEXT request_id + } + SINGLE_SCHEDULE { INTEGER id PK "AUTOINCREMENT" - TEXT request_id "NOT NULL - Format -> 2024-01-01T00:00:00" + TEXT request_id TEXT start_time "NOT NULL - Format -> 2024-01-01T00:00:00" - TEXT end_time + TEXT end_time "NOT NULL - Format -> 2024-01-01T00:00:00" } JOB ||--o{ SINGLE_SCHEDULE_JOB: performs JOB ||--o{ REPEATED_SCHEDULE_JOB : performs + JOB ||--o{ IMMEDIATE_SCHEDULE_JOB :performs JOB { INTEGER task_id PK "AUTOINCREMENT" @@ -52,6 +58,14 @@ erDiagram TEXT manifest "NOT NULL" } + IMMEDIATE_SCHEDULE ||--|{ IMMEDIATE_SCHEDULE_JOB : schedules + IMMEDIATE_SCHEDULE_JOB { + INTEGER priority "Order the job manifests should run - Starting with 0" + INTEGER schedule_id PK,FK "REFERENCES IMMEDIATE_SCHEDULE(schedule_id)" + INTEGER task_id PK,FK "REFERENCES job(task_id)" + TEXT status "NULL or scheduled" + } + REPEATED_SCHEDULE ||--|{ REPEATED_SCHEDULE_JOB : schedules REPEATED_SCHEDULE_JOB { INTEGER priority "Order the job manifests should run" @@ -85,6 +99,14 @@ erDiagram | 4 | fwupd-718814f3-b12a-432e-ac38-093e8dcb4bd1 | valid Inband Manageability XML manifest - FOTA | | 5 | setpwr-d8be8ae4-7512-43c0-9bdd-9a066de17322 | valid Inband Manageability XML manifest - Reboot system | +### IMMEDIATE_SCHEDULE Table + +| id | request_id | +| :---- | :---- | +| 1 | 6bf587ac-1d70-4e21-9a15-097f6292b9c4 | +| 2 | 6bf587ac-1d70-4e21-9a15-097f6292b9c4 | +| 3 | c9b74125-f3bb-440a-ad80-8d02090bd337 | + ### SINGLE_SCHEDULE Table | id | request_id | start_time | end_time | @@ -104,6 +126,16 @@ NOTE: These values may not make sense in the real world. Just for demonstration | 3 | c9b74125-f3bb-440a-ad80-8d02090bd337 | P2D | 0 | */8 | * | * | * | | 4 | c9b74125-f3bb-440a-ad80-8d02090bd337 | P14D | 0 | * | * | * | * | +### IMMEDIATE_SCHEDULE_JOB Table + +Example: To do a download, install, and reboot of SOTA at the time in schedule 1 + +| priority | schedule_id | task_id | status | +| :---- | :---- | :---- | :----- | +| 0 | 1 | 1 | scheduled | +| 1 | 1 | 2 | scheduled | +| 2 | 1 | 3 | | + ### SINGLE_SCHEDULE_JOB Table Example: To do a download, install, and reboot of SOTA at the time in schedule 1 diff --git a/inbm-lib/inbm_common_lib/constants.py b/inbm-lib/inbm_common_lib/constants.py index e4fb75ac5..5c483bd9a 100644 --- a/inbm-lib/inbm_common_lib/constants.py +++ b/inbm-lib/inbm_common_lib/constants.py @@ -20,6 +20,8 @@ RESPONSE_CHANNEL = 'manageability/response' EVENT_CHANNEL = 'manageability/event' TELEMETRY_CHANNEL = 'manageability/telemetry' +# Used for Node updates to be sent to UDM +UPDATE_CHANNEL = 'manageability/update' CONFIG_CHANNEL = 'ma/configuration/update/' # Request constants diff --git a/inbm/cloudadapter-agent/README.md b/inbm/cloudadapter-agent/README.md index e0f3a45d9..99e80295a 100644 --- a/inbm/cloudadapter-agent/README.md +++ b/inbm/cloudadapter-agent/README.md @@ -49,6 +49,7 @@ The agent subscribes to the following topics: - Agent events: `manageability/event` - Responses: `manageability/response` - Device telemetry: `manageability/telemetry` + - Update from scheduled requests: `manageability/update` ❗`+` is a wild-card indicating single level thus matching `diagnostic/state` or `/state` diff --git a/inbm/cloudadapter-agent/cloudadapter/client.py b/inbm/cloudadapter-agent/cloudadapter/client.py index 07e4b64d4..0800b1fdc 100644 --- a/inbm/cloudadapter-agent/cloudadapter/client.py +++ b/inbm/cloudadapter-agent/cloudadapter/client.py @@ -62,6 +62,10 @@ def _bind_agent_to_cloud(self) -> None: TC_TOPIC.EVENT, lambda _, payload: self._cloud_publisher.publish_event(payload) ) + self._broker.bind_callback( + TC_TOPIC.UPDATE, + lambda _, payload: self._cloud_publisher.publish_update(payload) + ) def _bind_ucc_to_agent(self) -> None: logger.debug("Binding cloud to Command") @@ -163,6 +167,7 @@ def stop(self) -> None: logger.debug("Stopping cloudadapter client") self._broker.stop() self._cloud_publisher.publish_event("Disconnected") + self._cloud_publisher.publish_update("Disconnected") try: logger.debug("Calling disconnect on adapter") self._adapter.disconnect() diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/adapter.py b/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/adapter.py index ad6cb204a..e70520b31 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/adapter.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/adapters/adapter.py @@ -53,6 +53,14 @@ def bind_callback(self, name: str, callback: Callable) -> None: """ pass + def publish_update(self, message: str) -> None: + """Publishes an update to the cloud + + @param message: (str) The update message to send + @exception PublishError: If publish fails + """ + self._client.publish_update("update", message) + def publish_event(self, message: str) -> None: """Publishes an event to the cloud diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/client/cloud_client.py b/inbm/cloudadapter-agent/cloudadapter/cloud/client/cloud_client.py index 456e1c693..362d2fe58 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/client/cloud_client.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/client/cloud_client.py @@ -8,23 +8,28 @@ from .handlers.receive_respond_handler import ReceiveRespondHandler from typing import Callable, Optional from datetime import datetime +import logging +logger = logging.getLogger(__name__) class CloudClient: def __init__(self, connection: MQTTConnection, telemetry: OneWayMessenger, event: OneWayMessenger, - attribute: OneWayMessenger, handler: ReceiveRespondHandler) -> None: + update: OneWayMessenger | None, attribute: OneWayMessenger, + handler: ReceiveRespondHandler) -> None: """Constructor for CloudClient @param connection: Connection associated with this CloudClient @param telemetry: Messenger to send telemetry @param event: Messenger to send events + @param update: Messenger to send updates @param attribute: Messenger to send attributes @param handler: Handler to deal with cloud method calls """ self._connection = connection self._telemetry = telemetry self._event = event + self._update: OneWayMessenger | None = update self._attribute = attribute self._handler = handler @@ -52,6 +57,19 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None: """ return self._telemetry.publish(key, value, time) + def publish_update(self, key: str, value: str) -> None: + """Publishes an update to the cloud + + @param key: key to publish + @param value: update to publish + @exception PublishError: If publish fails + """ + if self._update is None: + logger.error("Received update publish request but no update messenger is configured") + return None + else: + return self._update.publish(key, value) + def publish_event(self, key: str, value: str) -> None: """Publishes an event to the cloud diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py b/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py index d9dc0a890..cdd7d4739 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/client/inbs_cloud_client.py @@ -4,23 +4,24 @@ """ from collections.abc import Generator +import json import queue import random import threading -import time -from typing import Callable, Optional, Any +from google.protobuf.timestamp_pb2 import Timestamp +from typing import Callable, Optional from datetime import datetime from cloudadapter.cloud.adapters.inbs.operation import ( convert_updated_scheduled_operations_to_dispatcher_xml, ) from cloudadapter.constants import METHOD, DEAD -from cloudadapter.exceptions import AuthenticationError +from cloudadapter.exceptions import AuthenticationError, PublishError from cloudadapter.pb.inbs.v1 import inbs_sb_pb2_grpc, inbs_sb_pb2 from cloudadapter.pb.common.v1 import common_pb2 import logging -import grpc +import grpc # type: ignore from .cloud_client import CloudClient logger = logging.getLogger(__name__) @@ -68,9 +69,10 @@ def __init__( raise AuthenticationError( "TLS certificate path is required when TLS is enabled." ) - self._stop_event = threading.Event() + self._grpc_channel: grpc.Channel | None = None # this will get set after connect is called + def get_client_id(self) -> Optional[str]: """A readonly property @@ -112,6 +114,53 @@ def publish_telemetry(self, key: str, value: str, time: datetime) -> None: pass # INBS is not yet ready to receive telemetry + def publish_update(self, key: str, value: str) -> None: + """Publishes an update to the cloud + + @param message: node update message to publish + @exception PublishError: If publish fails + """ + if self._grpc_channel is None: + raise PublishError("gRPC channel not set up before calling InbsCloudClient.publish_update") + + # Turn the message into a dict + logger.debug(f"Received node update: key={key}, value={value}") + try: + message_dict = json.loads(value) + except json.JSONDecodeError as e: + logger.error(f"Cannot convert formatted message to dict: {value}. Error: {e}") + return + + status_code=message_dict.get("status", "") + job_state =common_pb2.Job.JobState.PASSED \ + if status_code == 200 \ + else common_pb2.Job.JobState.FAILED + + result_messages = json.dumps(message_dict.get("message", "")) + + timestamp = Timestamp() + timestamp.GetCurrentTime() + job=common_pb2.Job( + job_id=message_dict.get("job_id", ""), + node_id=self._client_id, + status_code=status_code, + result_msgs=result_messages, + actual_end_time=timestamp, + job_state=job_state + ) + + request = inbs_sb_pb2.SendNodeUpdateRequest( + request_id="notused", + job_update=job, + ) + logger.debug(f"Sending node update to INBS: request={request}") + + try: + response = self._grpc_channel.SendNodeUpdate(request, metadata=self._metadata) + logger.info(f"Received response from gRPC server: {response}") + except grpc.RpcError as e: + logger.error(f"Failed to send node update via gRPC: {e}") + def publish_event(self, key: str, value: str) -> None: """Publishes an event to the cloud @@ -144,7 +193,7 @@ def bind_callback(self, name: str, callback: Callable) -> None: # for now ignore all callbacks; only Ping is supported self._callbacks[name] = callback - + def _handle_inbm_command_request( self, request_queue: queue.Queue[inbs_sb_pb2.HandleINBMCommandRequest | None] ) -> Generator[inbs_sb_pb2.HandleINBMCommandResponse, None, None]: @@ -255,8 +304,9 @@ def _handle_inbm_command_request( break logger.debug("Exiting _handle_inbm_command_request") - def _do_socket_connect(self): - """Handle the socket/TLS/HTTP connection to the gRPC server.""" + def _make_grpc_channel(self) -> grpc.Channel: + """Handle the socket/TLS/HTTP connection to the gRPC server. + Assumption: should not connect until first gRPC command.""" if self._tls_enabled: # Create a secure channel with SSL credentials logger.debug("Setting up connection to INBS cloud with TLS enabled") @@ -270,30 +320,36 @@ def _do_socket_connect(self): self.channel = grpc.insecure_channel( f"{self._grpc_hostname}:{self._grpc_port}" ) - - self.stub = inbs_sb_pb2_grpc.INBSSBServiceStub(self.channel) + logger.info( f"Connection set up for {self._grpc_hostname}:{self._grpc_port}; will attempt TCP connection on first request." ) + return inbs_sb_pb2_grpc.INBSSBServiceStub(self.channel) def connect(self): + # set up the gRPC channel + self._grpc_channel = self._make_grpc_channel() + # Start the background thread self.background_thread = threading.Thread(target=self._run) self.background_thread.start() def _run(self): # pragma: no cover # multithreaded operation not unit testable """INBS cloud loop. Intended to be used inside a background thread.""" + + if self._grpc_channel is None: + raise RuntimeError("gRPC channel not set up before calling InbsCloudClient._run") + backoff = 0.1 # Initial fixed backoff delay in seconds max_backoff = 4.0 # Maximum backoff delay in seconds while not self._stop_event.is_set(): logger.debug("InbsCloudClient _run loop") try: - self._do_socket_connect() request_queue: queue.Queue[ inbs_sb_pb2.HandleINBMCommandRequest | None ] = queue.Queue() - stream = self.stub.HandleINBMCommand( + stream = self._grpc_channel.HandleINBMCommand( self._handle_inbm_command_request(request_queue), metadata=self._metadata, ) diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py b/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py index 127a3c978..1f18d8fb6 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_builders.py @@ -143,6 +143,7 @@ def build_messenger_with_config(config: Dict[str, Any]): telemetry = config.get("telemetry") attribute = config.get("attribute") event = config.get("event") + update = config.get("update") if telemetry: telemetry = build_messenger_with_config(telemetry) @@ -154,6 +155,10 @@ def build_messenger_with_config(config: Dict[str, Any]): else: raise ClientBuildError( "Missing 'attribute' MQTT config information while setting up cloud connection.") + if update: + update = build_messenger_with_config(update) + else: + logger.debug("Missing 'update' MQTT config information while setting up cloud connection. TODO: figure out why this doesn't happen for INBS, but does happen for other clouds.") if event: event = build_messenger_with_config(event) else: @@ -200,5 +205,6 @@ def build_messenger_with_config(config: Dict[str, Any]): connection=connection, telemetry=telemetry, event=event, + update=update, attribute=attribute, handler=handler) diff --git a/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_publisher.py b/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_publisher.py index c10c6d681..19193c57e 100644 --- a/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_publisher.py +++ b/inbm/cloudadapter-agent/cloudadapter/cloud/cloud_publisher.py @@ -72,3 +72,14 @@ def publish_telemetry(self, message: str) -> None: self._adapter.publish_attribute(key, value) except PublishError as e: logger.error(str(e)) + + def publish_update(self, message: str) -> None: + """Send node update to UDM + + @param message: (str) JSON formatted SendNodeUpdateRequest + """ + logger.debug(f"Received node update: {message}") + try: + self._adapter.publish_update(message) + except PublishError as e: + logger.error(str(e)) diff --git a/inbm/cloudadapter-agent/cloudadapter/constants.py b/inbm/cloudadapter-agent/cloudadapter/constants.py index 836dde7d3..a31319bfd 100644 --- a/inbm/cloudadapter-agent/cloudadapter/constants.py +++ b/inbm/cloudadapter-agent/cloudadapter/constants.py @@ -5,7 +5,7 @@ SPDX-License-Identifier: Apache-2.0 """ -from inbm_common_lib.constants import TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL +from inbm_common_lib.constants import UPDATE_CHANNEL, TELEMETRY_CHANNEL, RESPONSE_CHANNEL, EVENT_CHANNEL from inbm_lib.constants import DOCKER_STATS from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_ETC_PATH_PREFIX from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_SHARE_PATH_PREFIX, BROKER_ETC_PATH @@ -39,7 +39,7 @@ class TC_TOPIC: STATE = tuple([STATE_CHANNEL]) TELEMETRY = tuple([TELEMETRY_CHANNEL]) # Shared by TC and UCC EVENT = tuple([EVENT_CHANNEL, RESPONSE_CHANNEL]) # TODO: What's up with response? - + UPDATE = tuple([UPDATE_CHANNEL]) # Used for Node updates to be sent to UDM # ========== Publishing channels diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/client/connections/test_mqtt_connection.py b/inbm/cloudadapter-agent/tests/unit/cloud/client/connections/test_mqtt_connection.py index 87157d0ec..c8ec0de96 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/client/connections/test_mqtt_connection.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/client/connections/test_mqtt_connection.py @@ -19,13 +19,13 @@ class TestMQTTConnection(unittest.TestCase): @mock.patch('cloudadapter.cloud.client.connections.mqtt_connection.Waiter', autospec=True) - @mock.patch('cloudadapter.cloud.client.connections.mqtt_connection.mqtt', autospec=True) + @mock.patch('cloudadapter.cloud.client.connections.mqtt_connection.MQTTConnection._create_mqtt_client', autospec=True) def setUp(self, mock_mqtt, MockWaiter) -> None: mock_tls_config = mock.create_autospec(TLSConfig).return_value mock_proxy_config = mock.create_autospec(ProxyConfig).return_value mock_proxy_config.endpoint = ("end.point", 42) self.mock_waiter = MockWaiter.return_value - self.mock_client = mock_mqtt.Client.return_value + self.mock_client = mock_mqtt.return_value self.mqtt_connection = MQTTConnection( username="username", hostname="hostname", @@ -36,13 +36,13 @@ def setUp(self, mock_mqtt, MockWaiter) -> None: proxy_config=mock_proxy_config) @mock.patch('cloudadapter.cloud.client.connections.mqtt_connection.Waiter', autospec=True) - @mock.patch('cloudadapter.cloud.client.connections.mqtt_connection.mqtt', autospec=True) + @mock.patch('cloudadapter.cloud.client.connections.mqtt_connection.MQTTConnection._create_mqtt_client', autospec=True) def test_no_proxy_config(self, mock_mqtt, MockWaiter) -> None: mock_tls_config = mock.create_autospec(TLSConfig).return_value mock_proxy_config = mock.create_autospec(ProxyConfig).return_value mock_proxy_config.endpoint = None self.mock_waiter = MockWaiter.return_value - self.mock_client = mock_mqtt.Client.return_value + self.mock_client = mock_mqtt.return_value self.mqtt_connection = MQTTConnection( username="username", hostname="hostname", diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/client/test_cloud_client.py b/inbm/cloudadapter-agent/tests/unit/cloud/client/test_cloud_client.py index 8b84b3294..d72f6ecca 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/client/test_cloud_client.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/client/test_cloud_client.py @@ -25,12 +25,14 @@ def setUp(self) -> None: self.mock_telemetry = mock.create_autospec(Messenger) self.mock_attribute = mock.create_autospec(Messenger) self.mock_event = mock.create_autospec(Messenger) + self.mock_update = mock.create_autospec(Messenger) self.mock_handler = mock.create_autospec(Handler) self.cloud_client = CloudClient( connection=self.mock_connection, telemetry=self.mock_telemetry, event=self.mock_event, + update=self.mock_update, attribute=self.mock_attribute, handler=self.mock_handler ) @@ -45,6 +47,11 @@ def test_publish_attribute_succeeds(self) -> None: self.cloud_client.publish_attribute(*args) assert self.mock_attribute.publish.call_count == 1 + def test_publish_update_succeeds(self) -> None: + args = ("key", "value") + self.cloud_client.publish_update(*args) + assert self.mock_update.publish.call_count == 1 + def test_publish_event_succeeds(self) -> None: args = ("key", "value") self.cloud_client.publish_event(*args) diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py b/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py index 252e5e7f8..dce7bdd3d 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/client/test_inbs_cloud_client.py @@ -1,11 +1,13 @@ +import threading import pytest from mock import MagicMock, Mock, patch import queue -import grpc +from cloudadapter.exceptions import PublishError +import grpc # type: ignore from datetime import datetime from typing import Generator -from cloudadapter.constants import METHOD, RUNNING, DEAD +from cloudadapter.constants import RUNNING, DEAD from cloudadapter.pb.inbs.v1 import inbs_sb_pb2 from cloudadapter.pb.common.v1 import common_pb2 from cloudadapter.cloud.client.inbs_cloud_client import InbsCloudClient @@ -49,7 +51,33 @@ def test_publish_telemetry(self, inbs_client: InbsCloudClient) -> None: inbs_client.publish_telemetry( key="example_key", value="example_value", time=datetime.now() ) - + + def test_publish_update(self, inbs_client: InbsCloudClient) -> None: + mock_channel = MagicMock() + mock_channel.SendNodeUpdateRequest.return_value = "MockResponse" + inbs_client._grpc_channel = mock_channel + + key = 'test-key' + value = '{"job_id": "12345", "status": 200, "message": "Update successful"}' + + # Call the publish_update method + inbs_client.publish_update(key, value) + + # Assert that the gRPC channel's SendNodeUpdate method was called + mock_channel.SendNodeUpdate.assert_called_once() + + def test_publish_update_failure_no_grpc_channel(self, inbs_client: InbsCloudClient): + # Ensure that _grpc_channel is None to simulate the channel not being set up + inbs_client._grpc_channel = None + + # Define the key and value to be published + key = 'test-key' + value = '{"job_id": "12345", "status": 200, "message": "Update successful"}' + + # Call the publish_update method and expect a PublishError + with pytest.raises(PublishError): + inbs_client.publish_update(key, value) + def test_publish_event(self, inbs_client: InbsCloudClient) -> None: # this is not expected to do anything yet inbs_client.publish_event(key="example_event", value="event_value") @@ -142,7 +170,7 @@ def triggerschedule(xml: str, id: str, timeout: int) -> str: # Cleanup with pytest.raises(StopIteration): next(generator) - + def test_handle_command_when_dispatcher_is_not_up(self, inbs_client: InbsCloudClient) -> None: # Setup request_queue: queue.Queue[ @@ -169,27 +197,10 @@ def test_handle_command_when_dispatcher_is_not_up(self, inbs_client: InbsCloudCl error=common_pb2.Error(message="INBM Cloudadapter: Unable to process request. Please try again"), ) - def test_run_grpc_error(self, inbs_client: InbsCloudClient) -> None: - # Setup a RpcError to simulate gRPC error - with patch("grpc.insecure_channel") as mock_channel, \ - patch("threading.Event.wait", side_effect=InterruptedError) as mock_wait: - - mock_channel.side_effect = MagicMock(side_effect=grpc.RpcError()) - - # Ensure the stop event is not set initially - inbs_client._stop_event.clear() - - # Run the test expecting InterruptedError to stop the infinite loop - with pytest.raises(InterruptedError): - inbs_client._run() - - # Assert that stop_event.wait() was called - mock_wait.assert_called() - def test_run_stop_event_sets(self, inbs_client: InbsCloudClient) -> None: with patch( "cloudadapter.cloud.client.inbs_cloud_client.queue.Queue" - ) as mock_queue: + ) as mock_queue, patch.object(inbs_client, '_grpc_channel', new_callable=MagicMock): inbs_client._stop_event.set() # Act like we want to stop immediately inbs_client._run() diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_builders.py b/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_builders.py index 1026caf5a..0e0ea175a 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_builders.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_builders.py @@ -34,6 +34,10 @@ def setUp(self) -> None: "pub": "event_pub", "format": "event_format" }, + "update": { + "pub": "update_pub", + "format": "update_format" + }, "telemetry": { "pub": "telemetry_pub", "format": "telemetry_format" @@ -89,6 +93,10 @@ def setUp(self) -> None: "pub": "event_pub", "format": "event_format" }, + "update": { + "pub": "update_pub", + "format": "update_format" + }, "command": { "pub": "manageability/request/command", "format": "{ \"ts\": \"{ts}\", \"values\": {\"command\": \"{value}\"}}" diff --git a/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_publisher.py b/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_publisher.py index 5fd1f5728..b9e61b6f1 100644 --- a/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_publisher.py +++ b/inbm/cloudadapter-agent/tests/unit/cloud/test_cloud_publisher.py @@ -24,21 +24,40 @@ def setUp(self, MockedAdapter, mock_logger) -> None: self.MockedAdapter = MockedAdapter self.cloud_publisher = CloudPublisher(self.MockedAdapter("config")) + def test_publish_update_succeed(self) -> None: + update = "update" + self.cloud_publisher.publish_update(update) + + mocked = self.MockedAdapter.return_value + mocked.publish_update.assert_called_once_with(update) + + @mock.patch("cloudadapter.cloud.cloud_publisher.logger") + def test_publish_update_with_adapter__succeeds(self, mock_logger) -> None: + self.MockedAdapter.return_value.publish_update.return_value = None + self.cloud_publisher.publish_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}') + assert mock_logger.error.call_count == 0 + + @mock.patch("cloudadapter.cloud.cloud_publisher.logger") + def test_publish_update_with_adapter_fails(self, mock_logger) -> None: + self.MockedAdapter.return_value.publish_update.side_effect = PublishError("Error!") + self.cloud_publisher.publish_update('{"status": 200, "message": "COMMAND SUCCESSFUL", "job_id": "swupd-0cdce9d5-523b-43d9-8673-d54fd61498fe"}') + assert mock_logger.error.call_count == 1 + def test_publish_event_succeed(self) -> None: event = "event" self.cloud_publisher.publish_event(event) mocked = self.MockedAdapter.return_value mocked.publish_event.assert_called_once_with(event) - + @mock.patch("cloudadapter.cloud.cloud_publisher.logger") - def test_publish_event_with_adapter_success_succeeds(self, mock_logger) -> None: + def test_publish_event_with_adapter_succeeds(self, mock_logger) -> None: self.MockedAdapter.return_value.publish_event.return_value = None self.cloud_publisher.publish_event("Test Event") assert mock_logger.error.call_count == 0 @mock.patch("cloudadapter.cloud.cloud_publisher.logger") - def test_publish_event_with_adapter_fail_fails(self, mock_logger) -> None: + def test_publish_event_with_adapter_fails(self, mock_logger) -> None: self.MockedAdapter.return_value.publish_event.side_effect = PublishError("Error!") self.cloud_publisher.publish_event("Test Event") assert mock_logger.error.call_count == 1 @@ -57,7 +76,7 @@ def test_publish_telemetry_static_succeed(self) -> None: mocked.publish_attribute.assert_called_once_with("TestAttribute", "Test Value") @mock.patch("cloudadapter.cloud.cloud_publisher.logger") - def test_publish_telemetry_static_with_adapter_success_succeeds(self, mock_logger) -> None: + def test_publish_telemetry_static_with_adapter_succeeds(self, mock_logger) -> None: self.MockedAdapter.return_value.publish_attribute.return_value = None telemetry = json.dumps({ "type": "static_telemetry", @@ -69,7 +88,7 @@ def test_publish_telemetry_static_with_adapter_success_succeeds(self, mock_logge assert mock_logger.error.call_count == 0 @mock.patch("cloudadapter.cloud.cloud_publisher.logger") - def test_publish_telemetry_static_with_adapter_fail_fails(self, mock_logger) -> None: + def test_publish_telemetry_static_with_adapter_fails(self, mock_logger) -> None: self.MockedAdapter.return_value.publish_attribute.side_effect = PublishError("Error!") telemetry = json.dumps({ "type": "static_telemetry", diff --git a/inbm/dispatcher-agent/dispatcher/common/result_constants.py b/inbm/dispatcher-agent/dispatcher/common/result_constants.py index 472820ae3..cd9a36395 100644 --- a/inbm/dispatcher-agent/dispatcher/common/result_constants.py +++ b/inbm/dispatcher-agent/dispatcher/common/result_constants.py @@ -21,18 +21,21 @@ # Result object classes class Result: - __slots__ = ("status", "message", "json") + __slots__ = ("status", "message", "job_id", "json") - def __init__(self, status: int = 0, message: str = "") -> None: + def __init__(self, status: int = 0, message: str = "", job_id: str = "") -> None: """Result object containing a status code and message @param status: (int) Predefined status code - @param message: (str) Result message""" + @param message: (str) Result message + @param job_id: (str) Job ID""" self.status = status self.message = message - self.json = json.dumps({ + self.job_id = job_id + self.json = json.dumps({ "status": status, - "message": str(message) + "message": str(message), + "job_id": job_id }) def __eq__(self, other: object) -> bool: diff --git a/inbm/dispatcher-agent/dispatcher/constants.py b/inbm/dispatcher-agent/dispatcher/constants.py index cbdbd143e..bb0ef7d81 100644 --- a/inbm/dispatcher-agent/dispatcher/constants.py +++ b/inbm/dispatcher-agent/dispatcher/constants.py @@ -33,6 +33,8 @@ # Scheduler Sqlite3 DB FILE SCHEDULER_DB_FILE = str(INTEL_MANAGEABILITY_VAR_PATH_PREFIX / 'scheduler.db') SCHEDULED = "scheduled" +STARTED = "started" +COMPLETED = "completed" # Subscription channels TC_REQUEST_CHANNEL = 'manageability/request/#' diff --git a/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py b/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py index 0ee12e5af..3e4005fbd 100644 --- a/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py +++ b/inbm/dispatcher-agent/dispatcher/dispatcher_broker.py @@ -5,16 +5,18 @@ Copyright (C) 2017-2024 Intel Corporation SPDX-License-Identifier: Apache-2.0 """ +import json import logging -import os from typing import Any, Optional, Callable -from dispatcher.constants import AGENT, CLIENT_CERTS, CLIENT_KEYS +from dispatcher.constants import AGENT, CLIENT_CERTS, CLIENT_KEYS, COMPLETED +from dispatcher.schedule.sqlite_manager import SqliteManager +from dispatcher.schedule.schedules import Schedule from dispatcher.dispatcher_exception import DispatcherException from inbm_lib.mqttclient.config import DEFAULT_MQTT_HOST, DEFAULT_MQTT_PORT, MQTT_KEEPALIVE_INTERVAL from inbm_lib.mqttclient.mqtt import MQTT -from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL +from inbm_common_lib.constants import RESPONSE_CHANNEL, EVENT_CHANNEL, UPDATE_CHANNEL logger = logging.getLogger(__name__) @@ -35,31 +37,93 @@ def start(self, tls: bool) -> None: # pragma: no cover self.mqttc.start() self._is_started = True - def send_result(self, message: str, id: str = "") -> None: # pragma: no cover - """Sends event messages to local MQTT channel + def send_update(self, message: str) -> None: + """Sends node update to local MQTT 'UPDATE' channel to be published + to the cloudadapter where it will be sent as a reques to INBS (service in UDM) Raises ValueError if id contains a slash @param message: message to be published to cloud - @param id: if not "", publish to RESPONSE_CHANNEL/id instead of RESPONSE_CHANNEL + @param job_id: Job ID used to track the request in both UDM and TC """ - if id: - extra_log = f" with id {id}" + logger.debug(f"Sending node update for to {UPDATE_CHANNEL} with message: {message}") + self.mqtt_publish(topic=UPDATE_CHANNEL, payload=message) + + def _check_db_for_started_job(self) -> Optional[Schedule]: + sqliteMgr = SqliteManager() + schedule = sqliteMgr.get_any_started_schedule() + logger.debug(f"Checking for started schedule in DB: schedule={schedule}") + if schedule: + # Change status to COMPLETED + sqliteMgr.update_status(schedule, COMPLETED) + + del sqliteMgr + return schedule + + def send_result(self, message: str, request_id: str = "", job_id: str = "") -> None: # pragma: no cover + """Sends result to local MQTT channel + + Raises ValueError if request_id contains a slash + + @param message: message to be published to cloud + @param request_id: if not "", publish to RESPONSE_CHANNEL/request_id instead of RESPONSE_CHANNEL + """ + if request_id: + extra_log = f" with id {request_id}" else: extra_log = "" logger.debug(f"Sending result message{extra_log}: {message}") - if "/" in id: + if "/" in request_id: raise ValueError("id cannot contain '/'") if not self.is_started(): logger.error('Cannot send result: dispatcher core not initialized') - else: - if id != "": - topic = RESPONSE_CHANNEL + "/" + id + return + + schedule = None + # Check if this is a request stored in the DB and started from the APScheduler + if job_id != "": + schedule = Schedule(request_id=request_id, job_id=job_id) + else: + # Some jobs do not call send_result to the dispatcher class to get the + # job_id. In this case, we need to check the DB for the job_id. + schedule = self._check_db_for_started_job() + + if not schedule: + # This is not a scheduled job + logger.debug(f"Sending result message with id {request_id}: {message}") + if request_id != "": + topic = RESPONSE_CHANNEL + "/" + request_id self.mqtt_publish(topic=topic, payload=message) else: self.mqtt_publish(topic=RESPONSE_CHANNEL, payload=message) + else: + # This is a scheduled job + + # TODO: add error handling NEXMANAGE-743 + + try: + # Turn the message into a dict + message_dict = json.loads(message) + except json.JSONDecodeError as e: + logger.error(f"Cannot convert formatted message to dict: {message}. Error: {e}") + self.send_update(str(message)) + return + + # Update the job_id in the message + message_dict['job_id'] = schedule.job_id + + # Convert the updated message_dict back to a JSON string + try: + updated_message = json.dumps(message_dict) + except (TypeError, OverflowError) as e: + logger.error(f"Cannot convert Result back to string: {message_dict}. Error: {e}") + self.send_update(str(message)) + return + + logger.debug(f"Sending node update message: {str(updated_message)}") + self.send_update(str(updated_message)) def mqtt_publish(self, topic: str, payload: Any, qos: int = 0, retain: bool = False) -> None: # pragma: no cover """Publish arbitrary message on arbitrary topic. diff --git a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py index 4a6a85c43..ac39016ea 100644 --- a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py +++ b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py @@ -148,16 +148,16 @@ def start(self, tls: bool = True) -> None: self._perform_startup_tasks() # Run scheduler to schedule the task during startup. - single_schedules = self.sqlite_mgr.get_all_single_schedules_in_priority_order() + single_schedules = self.sqlite_mgr.get_single_schedules_in_priority_order() logger.info(f"Total single scheduled tasks: {len(single_schedules)}") for single_schedule in single_schedules: - self.ap_scheduler.add_single_schedule_job(self.do_install, single_schedule) + self.ap_scheduler.add_single_schedule_job(self.run_scheduled_job, single_schedule) logger.debug(f"Scheduled single job: {single_schedule}") - repeated_schedules = self.sqlite_mgr.get_all_repeated_schedules_in_priority_order() + repeated_schedules = self.sqlite_mgr.get_repeated_schedules_in_priority_order() logger.info(f"Total repeated scheduled jobs: {len(repeated_schedules)}") for repeated_schedule in repeated_schedules: - self.ap_scheduler.add_repeated_schedule_job(self.do_install, repeated_schedule) + self.ap_scheduler.add_repeated_schedule_job(self.run_scheduled_job, repeated_schedule) logger.debug(f"Scheduled repeated job: {repeated_schedule}") self.ap_scheduler.start() @@ -274,18 +274,31 @@ def _perform_cmd_type_operation(self, parsed_head: XmlHandler, xml: str) -> Resu def _telemetry(self, message: str) -> None: self._dispatcher_broker.telemetry(message) - def _send_result(self, message: str, id: str = "") -> None: + def _send_result(self, message: str, request_id: str = "", job_id: str = "") -> None: """Sends result message to local MQTT channel - If id is specified, the message is sent to RESPONSE_CHANNEL/id instead of RESPONSE_CHANNEL + If request_id is specified, the message is sent to RESPONSE_CHANNEL/id instead of RESPONSE_CHANNEL - Raises ValueError if id contains a slash + Raises ValueError if request_id contains a slash @param message: message to be published to cloud """ - self._dispatcher_broker.send_result(message, id) + # Check if this is a request stored in the DB and started from the APScheduler + logger.debug(f"Sending result message with id {request_id}: {message}") + self._dispatcher_broker.send_result(message, request_id, job_id) + - def do_install(self, xml: str, schema_location: Optional[str] = None) -> Result: + def run_scheduled_job(self, schedule: Schedule, manifest: str) -> None: + """Run the scheduled job. + + @param job_id: ID of the job to run. + @param manifest: The manifest to be passed to the callback function. + """ + logger.debug(f"Running schedule of type={type(schedule)}, job with JobID={schedule.job_id}, manifest={manifest}") + self.sqlite_mgr.update_status(schedule, STARTED) + self.do_install(xml=manifest, job_id=schedule.job_id) + + def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: str = "") -> Result: """Delegates the installation to either . call a DeviceManager command . do_ota_install @@ -359,9 +372,9 @@ def do_install(self, xml: str, schema_location: Optional[str] = None) -> Result: result = Result(CODE_BAD_REQUEST, str(e)) self._update_logger.status = FAIL self._update_logger.error = str(e) - finally: + finally: logger.info('Install result: %s', str(result)) - self._send_result(str(result)) + self._send_result(message=str(result), job_id=job_id) if result.status != CODE_OK and parsed_head: self._update_logger.status = FAIL self._update_logger.error = str(result) @@ -464,6 +477,13 @@ def _verify_username_password_present(self, usr: Optional[str], pwd: Optional[st elif (usr is None) and pwd: raise DispatcherException(f'No Username sent in manifest for {ota}') + def _add_request_to_queue(self, request_type: str, manifest: str, request_id: Optional[str]) -> None: + if not self.update_queue.full(): + self.update_queue.put((request_type, manifest, request_id)) + else: + self._send_result( + str(Result(CODE_FOUND, "Request already in progress; Please try again later"))) + def _on_cloud_request(self, topic: str, payload: str, qos: int) -> None: """Called when a message is received from cloud @@ -476,11 +496,7 @@ def _on_cloud_request(self, topic: str, payload: str, qos: int) -> None: request_type = topic.split('/')[2] request_id = topic.split('/')[3] if len(topic.split('/')) > 3 else None manifest = payload - if not self.update_queue.full(): - self.update_queue.put((request_type, manifest, request_id)) - else: - self._send_result( - str(Result(CODE_FOUND, "OTA In Progress, Try Later"))) + self._add_request_to_queue(request_type, manifest, request_id) def _on_message(self, topic: str, payload: Any, qos: int) -> None: """Called when a message is received from _telemetry-agent @@ -725,8 +741,7 @@ def check_dispatcher_state_info(self) -> None: dispatcher_state.clear_dispatcher_state() else: self._telemetry('Dispatcher detects normal boot sequence') - - + def handle_updates(dispatcher: Any, schedule_manifest_schema=SCHEDULE_SCHEMA_LOCATION, manifest_schema=SCHEMA_LOCATION) -> None: @@ -739,12 +754,15 @@ def handle_updates(dispatcher: Any, manifest: str = message[1] if message[2]: request_id: str = message[2] + + # Dispatcher sends back the acknowledgement response before processing the immediate scheduling. + dispatcher._send_result("", request_id) if request_type == "schedule": if not request_id: dispatcher._send_result("Error: No request ID provided for schedule request.") - logger.debug("DEBUG: manifest = " + manifest) + logger.debug("DEBUG: schedule manifest = " + manifest) try: schedule = ScheduleManifestParser(manifest, schedule_manifest_schema, manifest_schema) except XmlException as e: @@ -759,37 +777,34 @@ def handle_updates(dispatcher: Any, dispatcher.ap_scheduler.remove_all_jobs() # Add schedules to the database - if schedule.single_scheduled_requests or schedule.repeated_scheduled_requests: - def process_scheduled_requests(scheduled_requests: Sequence[Schedule]): - with sql_lock: - for requests in scheduled_requests: - dispatcher.sqlite_mgr.create_schedule(requests) - all_scheduled_requests = schedule.single_scheduled_requests + schedule.repeated_scheduled_requests - process_scheduled_requests(all_scheduled_requests) + def process_scheduled_requests(scheduled_requests: Sequence[Schedule]): + with sql_lock: + for requests in scheduled_requests: + dispatcher.sqlite_mgr.create_schedule(requests) + all_scheduled_requests = schedule.single_scheduled_requests + schedule.repeated_scheduled_requests + schedule.immedate_requests + logger.debug(f"Total scheduled requests: {len(all_scheduled_requests)}") + process_scheduled_requests(all_scheduled_requests) # Add job to the scheduler - single_schedules = dispatcher.sqlite_mgr.get_all_single_schedules_in_priority_order() - logger.info(f"Total single scheduled tasks: {len(single_schedules)}") + immediate_schedules = dispatcher.sqlite_mgr.get_immediate_schedules_in_priority_order() + logger.debug(f"Total immediate schedules: {len(immediate_schedules)}") + for immediate_schedule in immediate_schedules: + dispatcher.ap_scheduler.add_immediate_job(dispatcher.run_scheduled_job, immediate_schedule) + logger.debug(f"Immediate schedule: {immediate_schedule}") + + single_schedules = dispatcher.sqlite_mgr.get_single_schedules_in_priority_order() + logger.debug(f"Total single schedules: {len(single_schedules)}") for single_schedule in single_schedules: - dispatcher.ap_scheduler.add_single_schedule_job(dispatcher.do_install, single_schedule) + dispatcher.ap_scheduler.add_single_schedule_job(dispatcher.run_scheduled_job, single_schedule) logger.debug(f"Scheduled single job: {single_schedule}") - repeated_schedules = dispatcher.sqlite_mgr.get_all_repeated_schedules_in_priority_order() - logger.info(f"Total repeated scheduled jobs: {len(repeated_schedules)}") + repeated_schedules = dispatcher.sqlite_mgr.get_repeated_schedules_in_priority_order() + logger.debug(f"Total repeated schedules: {len(repeated_schedules)}") for repeated_schedule in repeated_schedules: - dispatcher.ap_scheduler.add_repeated_schedule_job( - dispatcher.do_install, repeated_schedule) + dispatcher.ap_scheduler.add_repeated_schedule_job(dispatcher.run_scheduled_job, repeated_schedule) logger.debug(f"Scheduled repeated job: {repeated_schedule}") - # Dispatcher sends back the acknowledgement response before processing the immediate scheduling. - dispatcher._send_result("", request_id) - for imm in schedule.immedate_requests: - for manifest in imm.manifests: - try: - dispatcher.do_install(xml=manifest) - except (NotImplementedError, DispatcherException) as e: - # TODO: Save the error for query request - logger.error(str(e)) + return if request_type == "install" or request_type == "query": diff --git a/inbm/dispatcher-agent/dispatcher/fota/constants.py b/inbm/dispatcher-agent/dispatcher/fota/constants.py index 4a5606921..08a1ff83c 100644 --- a/inbm/dispatcher-agent/dispatcher/fota/constants.py +++ b/inbm/dispatcher-agent/dispatcher/fota/constants.py @@ -5,7 +5,7 @@ SPDX-License-Identifier: Apache-2.0 """ # Device local cache -from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_CACHE_PATH_PREFIX, INTEL_MANAGEABILITY_RAW_ETC, \ +from inbm_lib.path_prefixes import INTEL_MANAGEABILITY_RAW_ETC, \ INTEL_MANAGEABILITY_SHARE_PATH_PREFIX import datetime diff --git a/inbm/dispatcher-agent/dispatcher/ota_parser.py b/inbm/dispatcher-agent/dispatcher/ota_parser.py index 09502b31f..97f272c66 100644 --- a/inbm/dispatcher-agent/dispatcher/ota_parser.py +++ b/inbm/dispatcher-agent/dispatcher/ota_parser.py @@ -16,7 +16,6 @@ from .dispatcher_exception import DispatcherException from inbm_lib.xmlhandler import XmlException from inbm_lib.xmlhandler import XmlHandler -from inbm_lib.security_masker import mask_security_info from inbm_common_lib.constants import DEFAULT_HASH_ALGORITHM, LOCAL_SOURCE logger = logging.getLogger(__name__) diff --git a/inbm/dispatcher-agent/dispatcher/schedule/apscheduler.py b/inbm/dispatcher-agent/dispatcher/schedule/apscheduler.py index 3d4d3231a..e39a3efeb 100644 --- a/inbm/dispatcher-agent/dispatcher/schedule/apscheduler.py +++ b/inbm/dispatcher-agent/dispatcher/schedule/apscheduler.py @@ -1,5 +1,5 @@ """ - Uses APScheduler to execute scheduled tasks. + Uses APScheduler to schedule and execute scheduled tasks. Copyright (C) 2024 Intel Corporation SPDX-License-Identifier: Apache-2.0 @@ -14,7 +14,7 @@ from .schedules import Schedule, SingleSchedule, RepeatedSchedule from apscheduler.schedulers.background import BackgroundScheduler from .sqlite_manager import SqliteManager -from ..constants import SCHEDULED +from ..constants import SCHEDULED, STARTED from ..dispatcher_exception import DispatcherException logger = logging.getLogger(__name__) @@ -24,6 +24,7 @@ class APScheduler: def __init__(self, sqlite_mgr: SqliteManager) -> None: self._scheduler = BackgroundScheduler() self._sqlite_mgr = sqlite_mgr + def start(self) -> None: """Start the scheduler""" @@ -33,42 +34,58 @@ def starting_message(): self._scheduler.add_job(starting_message, 'date', run_date=datetime.now() + timedelta(seconds=1)) sleep(1) - def remove_all_jobs(self) -> None: """Remove all jobs.""" logger.debug("Remove all jobs in APScheduler") self._scheduler.remove_all_jobs() - def add_single_schedule_job(self, callback: Callable, - single_schedule: SingleSchedule) -> None: - """Add the job for single schedule. + def add_immediate_job(self, callback: Callable, schedule: Schedule) -> None: + """Add the job for immediate schedule. @param callback: The function to be called. + @param manifest: The manifest to be passed to the callback function. + """ + logger.debug("Add IMMEDIATE job to APScheduler") + try: + for manifest in schedule.manifests: + self._scheduler.add_job( + func=callback, args=[schedule, manifest]) + logger.debug("CHANGE IM STATUS TO SCHEDULED") + self._sqlite_mgr.update_status(schedule, SCHEDULED) + except (ValueError, TypeError) as err: + raise DispatcherException(f"Please correct and resubmit scheduled request. Invalid parameter used in date expresssion to APScheduler: {err}") + + def add_single_schedule_job(self, callback: Callable, single_schedule: SingleSchedule) -> None: + """Add the job for single schedule. + @param single_schedule: SingleSchedule object """ - logger.debug("") - if self.is_schedulable(single_schedule): - self._sqlite_mgr.update_status(single_schedule, SCHEDULED) + logger.debug("Add SINGLE job to APScheduler") + if self.is_schedulable(single_schedule): try: for manifest in single_schedule.manifests: self._scheduler.add_job( - func=callback, trigger='date', run_date=single_schedule.start_time, args=[manifest]) + func=callback, + trigger='date', + run_date=single_schedule.start_time, + args=[single_schedule, manifest]) + logger.debug("CHANGE SS STATUS TO SCHEDULED") + self._sqlite_mgr.update_status(single_schedule, SCHEDULED) except (ValueError, TypeError) as err: raise DispatcherException(f"Please correct and resubmit scheduled request. Invalid parameter used in date expresssion to APScheduler: {err}") - def add_repeated_schedule_job(self, callback: Callable, repeated_schedule: RepeatedSchedule) -> None: """Add the job for repeated schedule. @param callback: The function to be called. @param repeated_schedule: RepeatedSchedule object. """ - logger.debug("") - if self.is_schedulable(repeated_schedule): - self._sqlite_mgr.update_status(repeated_schedule, SCHEDULED) + logger.debug("Add REPEATED job to APScheduler") + if self.is_schedulable(repeated_schedule): try: - for manifest in repeated_schedule.manifests: - self._scheduler.add_job(func=callback, trigger='cron', args=[manifest], + for manifest in repeated_schedule.manifests: + self._scheduler.add_job(func=callback, trigger='cron', + args=[repeated_schedule, manifest], start_date=datetime.now(), end_date=self._convert_duration_to_end_time( repeated_schedule.cron_duration), @@ -77,6 +94,7 @@ def add_repeated_schedule_job(self, callback: Callable, repeated_schedule: Repea day=repeated_schedule.cron_day_month, month=repeated_schedule.cron_month, day_of_week=repeated_schedule.cron_day_week) + self._sqlite_mgr.update_status(repeated_schedule, SCHEDULED) except (ValueError, TypeError) as err: raise DispatcherException(f"Please correct and resubmit scheduled request. Invalid parameter used in cron expresssion to APScheduler: {err}") @@ -88,12 +106,10 @@ def is_schedulable(self, schedule: Schedule) -> bool: """ if isinstance(schedule, SingleSchedule): return self._check_single_schedule(schedule) - elif isinstance(schedule, RepeatedSchedule): + if isinstance(schedule, RepeatedSchedule): return self._check_repeated_schedule(schedule) - else: - logger.error("Schedule type is neither a SingleSchedule nor a RepeatedSchedule object.") - return False - + return True + def _check_single_schedule(self, schedule: SingleSchedule) -> bool: """Check if the schedule can be scheduled. diff --git a/inbm/dispatcher-agent/dispatcher/schedule/manifest_parser.py b/inbm/dispatcher-agent/dispatcher/schedule/manifest_parser.py index 1fb4e4369..7e0dbb649 100644 --- a/inbm/dispatcher-agent/dispatcher/schedule/manifest_parser.py +++ b/inbm/dispatcher-agent/dispatcher/schedule/manifest_parser.py @@ -108,18 +108,19 @@ def _parse_single_schedule(self, schedule: untangle.Element, schedule_details: S in the SingleSchedule object list. @param schedule (untangle.Element): pointer to the schedule elements - @param manifests (list[str]): list of valid Inband manifests to be scheduled - @param request_id (str): request ID from manifest + @param schedule_details (Schedule): details of the schedule """ single_schedule = schedule.single_schedule for ss in single_schedule: if not hasattr(ss, 'start_time'): + # Immediate request self.immedate_requests.append( SingleSchedule( request_id=schedule_details.request_id, job_id=ss.job_id.cdata, manifests=schedule_details.manifests)) else: + # Single Scheduled request end = ss.end_time.cdata if hasattr(ss, 'end_time') else None self.single_scheduled_requests.append( SingleSchedule( @@ -134,8 +135,7 @@ def _parse_repeated_schedule(self, schedule: untangle.Element, schedule_details: in the RepeatedSchedule object list. @param schedule (untangle.Element): pointer to the schedule elements - @param manifests (list[str]): list of valid Inband manifests to be scheduled - @param request_id (str): request ID from manifest + @param schedule_details (Schedule): details of the schedule """ repeated_schedules = schedule.repeated_schedule for repeated_schedule in repeated_schedules: diff --git a/inbm/dispatcher-agent/dispatcher/schedule/schedules.py b/inbm/dispatcher-agent/dispatcher/schedule/schedules.py index 38e517387..67a936d2d 100644 --- a/inbm/dispatcher-agent/dispatcher/schedule/schedules.py +++ b/inbm/dispatcher-agent/dispatcher/schedule/schedules.py @@ -13,8 +13,8 @@ class Schedule: """ Represents a Base class for schedule objects.""" request_id: str - schedule_id: Optional[int] = field(default=None) - job_id: Optional[str] = field(default=None) + job_id: str = field(default="") + schedule_id: Optional[int] = field(default=None) task_id: int = field(default=-1) priority: int = field(default=0) manifests: List[str] = field(default_factory=list) diff --git a/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py b/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py index 24019a7d7..6c5d9bad7 100644 --- a/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py +++ b/inbm/dispatcher-agent/dispatcher/schedule/sqlite_manager.py @@ -11,18 +11,25 @@ import stat from datetime import datetime -from typing import List +from typing import Any, List, Optional from inbm_common_lib.utility import get_canonical_representation_of_path from .schedules import SingleSchedule, RepeatedSchedule, Schedule from ..dispatcher_exception import DispatcherException -from ..constants import SCHEDULER_DB_FILE, SCHEDULED +from ..constants import SCHEDULER_DB_FILE logger = logging.getLogger(__name__) class SqliteManager: + _instance = None + + def __new__(cls, *args, **kwargs): + if not cls._instance: + cls._instance = super().__new__(cls) + return cls._instance + def __init__(self, db_file=SCHEDULER_DB_FILE) -> None: """Handles the connection to the SQLite database and all database operations. @@ -31,7 +38,7 @@ def __init__(self, db_file=SCHEDULER_DB_FILE) -> None: self._db_file = get_canonical_representation_of_path(db_file) # Create the DB if it doesn't exist self._create_db() - + try: with sqlite3.connect(self._db_file, check_same_thread=False) as conn: self._conn = conn @@ -39,19 +46,15 @@ def __init__(self, db_file=SCHEDULER_DB_FILE) -> None: logger.error(f"Error connecting to Dispatcher Schedule database: {e}") raise DispatcherException(f"Error connecting to Dispatcher Schedule database: {e}") - try: - self._cursor = self._conn.cursor() - except sqlite3.Error as e: - logger.error(f"Error creating cursor: {e}") - self._conn.close() - raise DispatcherException(f"Error creating cursor: {e}") - self._create_tables_if_not_exist() def close(self) -> None: """Close the connection to the SQLite database.""" - self._cursor.close() - self._conn.close() + try: + if self._conn: + self._conn.close() + except sqlite3.Error as e: + logger.error(f"Error closing connection to Dispatcher Schedule database: {e}") def __del__(self) -> None: """Close the connection to the SQLite database.""" @@ -60,17 +63,27 @@ def __del__(self) -> None: def clear_database(self) -> None: """Clear the database of all data.""" try: - self._conn.execute('BEGIN') - self._conn.execute('DELETE FROM single_schedule_job;') - self._conn.execute('DELETE FROM repeated_schedule_job;') - self._conn.execute('DELETE FROM single_schedule;') - self._conn.execute('DELETE FROM repeated_schedule;') - self._conn.execute('DELETE FROM job;') - self._conn.execute('COMMIT') + cursor = self._conn.cursor() + cursor.execute('BEGIN') + cursor.execute('DELETE FROM immediate_schedule_job;') + cursor.execute("DELETE FROM sqlite_sequence WHERE name='immediate_schedule_job';") + cursor.execute('DELETE FROM single_schedule_job;') + cursor.execute("DELETE FROM sqlite_sequence WHERE name='single_schedule_job';") + cursor.execute('DELETE FROM repeated_schedule_job;') + cursor.execute("DELETE FROM sqlite_sequence WHERE name='repeated_schedule_job';") + cursor.execute('DELETE FROM immediate_schedule;') + cursor.execute("DELETE FROM sqlite_sequence WHERE name='immediate_schedule';") + cursor.execute('DELETE FROM single_schedule;') + cursor.execute("DELETE FROM sqlite_sequence WHERE name='single_schedule';") + cursor.execute('DELETE FROM repeated_schedule;') + cursor.execute("DELETE FROM sqlite_sequence WHERE name='repeated_schedule';") + cursor.execute('DELETE FROM job;') + cursor.execute('DELETE FROM sqlite_sequence WHERE name="job";') + cursor.execute('COMMIT') except sqlite3.Error as e: - self._conn.execute('ROLLBACK') - logger.error(f"Error clearing database: {e}") - raise DispatcherException(f"Error clearing database: {e}") + self._rollback_transaction(str(e), "Error clearing database") + finally: + cursor.close() def _create_db(self) -> None: # Create database file if not exist @@ -80,16 +93,110 @@ def _create_db(self) -> None: mode = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP fd = os.open(self._db_file, os.O_CREAT | os.O_WRONLY, mode) os.close(fd) + + def _fetch_schedules(self, sql: str) -> list[Any]: + try: + cursor = self._conn.cursor() + cursor.execute(sql) + return cursor.fetchall() + finally: + cursor.close() + + def get_any_started_schedule(self) -> Optional[Schedule]: + sql = ''' SELECT + j.job_id, + j.task_id, + sj.schedule_id, + sj.schedule_type, + COALESCE(iss.request_id, sss.request_id, rss.request_id) AS request_id + FROM + job j + JOIN + ( + SELECT task_id, schedule_id, 'Immediate' AS schedule_type FROM immediate_schedule_job WHERE status = 'started' + UNION ALL + SELECT task_id, schedule_id, 'Single' AS schedule_type FROM single_schedule_job WHERE status = 'started' + UNION ALL + SELECT task_id, schedule_id, 'Repeated' AS schedule_type FROM repeated_schedule_job WHERE status = 'started' + ) sj ON j.task_id = sj.task_id + LEFT JOIN + immediate_schedule iss ON sj.schedule_id = iss.id AND sj.schedule_type = 'Immediate' + LEFT JOIN + single_schedule sss ON sj.schedule_id = sss.id AND sj.schedule_type = 'Single' + LEFT JOIN + repeated_schedule rss ON sj.schedule_id = rss.id AND sj.schedule_type = 'Repeated' + ''' + + cursor = self._conn.cursor() + try: + cursor.execute(sql) + row = cursor.fetchall() + if len(row) > 1: + raise DispatcherException("More than one schedule in 'started' state.") + if len(row) == 1: + job_id = row[0][0] + task_id = row[0][1] + schedule_id = row[0][2] + schedule_type = row[0][3] + request_id = row[0][4] + logger.debug(f"Schedule in 'STARTED' state has type={schedule_type}, jobID={job_id}, taskID={task_id}, scheduleID={schedule_id}, requestID={request_id}") + + if schedule_type == 'Immediate': + return SingleSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id) + elif schedule_type == 'Single': + return SingleSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id, start_time=datetime.now()) + else: + return RepeatedSchedule(request_id=request_id, job_id=job_id, task_id=task_id, schedule_id=schedule_id) + return None + except (sqlite3.Error) as e: + raise DispatcherException( + f"Error in getting job in 'started' state: {e}") + finally: + cursor.close() + + def get_immediate_schedules_in_priority_order(self) -> List[SingleSchedule]: + """ + Get all the immediate schedules. + @return: List of Schedule object + """ + + sql = ''' SELECT isj.priority, isj.schedule_id, isj.task_id, j.job_id + FROM immediate_schedule_job isj + JOIN job j ON isj.task_id=j.task_id + WHERE isj.status IS NULL + ORDER BY priority ASC; ''' + + try: + rows = self._fetch_schedules(sql) - def get_all_single_schedules_in_priority_order(self) -> List[SingleSchedule]: + s: List[SingleSchedule] = [] + for row in rows: + immediate_schedule = self._select_immediate_schedule_by_id(str(row[1])) + immediate_schedule.manifests = [self._select_job_by_task_id(str(row[2]))] + immediate_schedule.job_id = str(row[3]) + immediate_schedule.priority = row[0] + immediate_schedule.task_id = row[2] + s.append(immediate_schedule) + return s + except (sqlite3.Error) as e: + raise DispatcherException( + f"Error in getting immediate schedules from database: {e}") + + def get_single_schedules_in_priority_order(self) -> List[SingleSchedule]: """ Get all the SingleSchedule and arrange them by priority in ascending order. @return: List of SingleSchedule object by priority in ascending order """ - try: - sql = ''' SELECT ssj.priority, ssj.schedule_id, ssj.task_id, j.job_id FROM single_schedule_job ssj JOIN job j ON ssj.task_id=j.task_id WHERE ssj.status IS NULL ORDER BY priority ASC; ''' - self._cursor.execute(sql) - rows = self._cursor.fetchall() + + sql = ''' SELECT ssj.priority, ssj.schedule_id, ssj.task_id, j.job_id + FROM single_schedule_job ssj + JOIN job j ON ssj.task_id=j.task_id + WHERE ssj.status IS NULL + ORDER BY priority ASC; ''' + + try: + rows = self._fetch_schedules(sql) + ss: List[SingleSchedule] = [] for row in rows: single_schedule = self._select_single_schedule_by_id(str(row[1])) @@ -101,18 +208,23 @@ def get_all_single_schedules_in_priority_order(self) -> List[SingleSchedule]: return ss except (sqlite3.Error) as e: raise DispatcherException( - f"Error in getting the all single schedules from database: {e}") + f"Error in getting single schedules from database: {e}") - def get_all_repeated_schedules_in_priority_order(self) -> List[RepeatedSchedule]: + def get_repeated_schedules_in_priority_order(self) -> List[RepeatedSchedule]: """ Get all the RepeatedSchedule and arrange them by priority in ascending order. @return: List of RepeatedSchedule object by priority in ascending order """ + + sql = ''' SELECT rsj.priority, rsj.schedule_id, rsj.task_id, j.job_id + FROM repeated_schedule_job rsj + JOIN job j ON rsj.task_id=j.task_id + WHERE rsj.status IS NULL + ORDER BY priority ASC; ''' + try: - sql = ''' SELECT rsj.priority, rsj.schedule_id, rsj.task_id, j.job_id FROM repeated_schedule_job rsj JOIN job j ON rsj.task_id=j.task_id WHERE rsj.status IS NULL ORDER BY priority ASC; ''' - - self._cursor.execute(sql) - rows = self._cursor.fetchall() + rows = self._fetch_schedules(sql) + rs: List[RepeatedSchedule] = [] for row in rows: repeated_schedule = self._select_repeated_schedule_by_id(str(row[1])) @@ -124,28 +236,55 @@ def get_all_repeated_schedules_in_priority_order(self) -> List[RepeatedSchedule] return rs except (sqlite3.Error) as e: raise DispatcherException( - f"Error in getting the all repeated schedules from database: {e}") + f"Error in getting repeated schedules from database: {e}") + def _fetch_one(self, sql: str, values: tuple) -> Any: + try: + cursor = self._conn.cursor() + cursor.execute(sql, values) + row = cursor.fetchone() + return row + finally: + cursor.close() + def _select_job_by_task_id(self, task_id: str) -> str: """Get the job stored in database by task id. @param id: row index @return: job """ sql = ''' SELECT manifest FROM job WHERE rowid=?; ''' - self._cursor.execute(sql, (task_id,)) - row = self._cursor.fetchone() + row = self._fetch_one(sql, (task_id,)) + manifest = row[0] logger.debug(f"id={task_id}, manifest={manifest}") return manifest + def _get_schedule_by_schedule_id(self, sql: str, schedule_id: str) -> Any: + row = self._fetch_one(sql, (schedule_id,)) + if not row: + raise DispatcherException(f"Unable to find the scheduleID: {schedule_id}.") + return row + + def _select_immediate_schedule_by_id(self, schedule_id: str) -> SingleSchedule: + """Get the immediate schedule stored in database by id. + @param id: row index + @return: Schedule object + """ + sql = ''' SELECT request_id FROM immediate_schedule WHERE rowid=?; ''' + result = self._get_schedule_by_schedule_id(sql, schedule_id) + request_id = result[0] + + logger.debug( + f"schedule_id={schedule_id}, request_id={request_id}") + return SingleSchedule(schedule_id=int(schedule_id), request_id=request_id) + def _select_single_schedule_by_id(self, schedule_id: str) -> SingleSchedule: """Get the single schedule stored in database by id. @param id: row index @return: SingleSchedule object - """ + """ sql = ''' SELECT request_id, start_time, end_time FROM single_schedule WHERE rowid=?; ''' - self._cursor.execute(sql, (schedule_id,)) - result = self._cursor.fetchone() + result = self._get_schedule_by_schedule_id(sql, schedule_id) request_id = result[0] start_time = datetime.fromisoformat(result[1]) @@ -164,8 +303,7 @@ def _select_repeated_schedule_by_id(self, schedule_id: str) -> RepeatedSchedule: @return: RepeatedSchedule object """ sql = ''' SELECT request_id, cron_duration, cron_minutes, cron_hours, cron_day_month, cron_month, cron_day_week FROM repeated_schedule WHERE rowid=?; ''' - self._cursor.execute(sql, (schedule_id,)) - result = self._cursor.fetchone() + result = self._get_schedule_by_schedule_id(sql, schedule_id) request_id = result[0] cron_duration = result[1] @@ -193,40 +331,111 @@ def update_status(self, schedule: Schedule, status: str) -> None: @param schedule: SingleSchedule or RepeatedSchedule object @param status: status to be set """ - try: - sql = "" - if isinstance(schedule, SingleSchedule): + + if schedule.task_id == -1: + raise DispatcherException("Unable to update status in database as the task ID is not set.") + + sql = "" + if isinstance(schedule, SingleSchedule): + if schedule.start_time: sql = ''' UPDATE single_schedule_job SET status = ? WHERE priority = ? AND schedule_id = ? AND task_id = ?; ''' - elif isinstance(schedule, RepeatedSchedule): - sql = ''' UPDATE repeated_schedule_job SET status = ? WHERE priority = ? AND schedule_id = ? AND task_id = ?; ''' - - if schedule.task_id != -1: - logger.debug(f"Update status in database to {status} with schedule_id={schedule.schedule_id}, task_id={schedule.task_id}") - self._cursor.execute( - sql, (status, schedule.priority, schedule.schedule_id, schedule.task_id)) - self._conn.commit() else: - logger.error("Unable to update status in database as the task ID is not set.") + sql = ''' UPDATE immediate_schedule_job SET status = ? WHERE priority = ? AND schedule_id = ? AND task_id = ?; ''' + elif isinstance(schedule, RepeatedSchedule): + sql = ''' UPDATE repeated_schedule_job SET status = ? WHERE priority = ? AND schedule_id = ? AND task_id = ?; ''' + else: + raise DispatcherException("Unable to update status in database as the schedule type is not recognized.") + + logger.debug(f"Update status in database to {status.upper()} for schedule={schedule}") + try: + cursor = self._conn.cursor() + logger.debug(f"Execute -> {sql}") + cursor.execute( + sql, (status, schedule.priority, schedule.schedule_id, schedule.task_id)) + self._conn.commit() + logger.debug(f"Status of JobID={schedule.job_id} updated in database to {status.upper()}.") except (sqlite3.Error) as e: raise DispatcherException( - f"Error to update status in Dispatcher Schedule database: {e}") - + f"Error updating the schedule status to {status.upper()} in the Dispatcher Schedule database: {e}") + finally: + cursor.close() + def create_schedule(self, schedule: Schedule) -> None: """ Create a new schedule in the database. - @param schedule: SingleSchedule or RepeatedSchedule object + @param schedule: Schedule (Immediate), SingleSchedule or RepeatedSchedule object """ try: - if isinstance(schedule, SingleSchedule): - self._create_single_schedule(schedule) - elif isinstance(schedule, RepeatedSchedule): + if isinstance(schedule, RepeatedSchedule): + logger.debug("Create REPEATED schedule") self._create_repeated_schedule(schedule) - else: - logger.error( - "Schedule type is neither a SingleSchedule nor a RepeatedSchedule object.") + elif isinstance(schedule, SingleSchedule): + if schedule.start_time: + logger.debug("Create SINGLE schedule") + self._create_single_schedule(schedule) + else: # Immediate Schedule + logger.debug("Create IMMEDIATE schedule") + self._create_immediate_schedule(schedule) except (sqlite3.Error) as e: raise DispatcherException(f"Error connecting to Dispatcher Schedule database: {e}") + def _insert_schedule(self, sql: str, values: tuple) -> int: + try: + cursor = self._conn.cursor() + cursor.execute('BEGIN') + cursor.execute(sql, values) + schedule_id = cursor.lastrowid + if not schedule_id: + raise DispatcherException("No schedule id was added to the schedule table.") + return schedule_id + except (sqlite3.Error) as e: + cursor.execute('ROLLBACK') + logger.error(f"Error inserting into schedule table:: {e}") + raise DispatcherException(f"Error inserting into schedule table: {e}") + finally: + cursor.close() + + def _execute_sql_statement(self, sql: str, values: tuple, errMsg: str) -> None: + try: + cursor = self._conn.cursor() + cursor.execute(sql, values) + except (sqlite3.Error) as e: + self._rollback_transaction(str(e), errMsg) + finally: + cursor.close() + + def _rollback_transaction(self, e: str, errorMsg: str) -> None: + try: + cursor = self._conn.cursor() + cursor.execute('ROLLBACK') + finally: + cursor.close() + logger.error(f"{errorMsg}: {e}") + raise DispatcherException(f"{errorMsg}: {e}") + + def _create_immediate_schedule(self, s: Schedule) -> None: + # Add the schedule to the immediate_schedule table + logger.debug("Create IMMEDIATE schedule") + logger.debug( + f"Execute -> INSERT INTO immediate_schedule(request_id) VALUES({s.request_id})") + + sql = ''' INSERT INTO immediate_schedule(request_id) VALUES(?); ''' + try: + schedule_id = self._insert_schedule(sql, (s.request_id,)) + + logger.debug( + f"Added schedule with id: {str(schedule_id)}, request_id: {s.request_id}") + + if s.job_id: + # Add the jobs to the job table + task_ids = self._insert_job(s.job_id, s.manifests) + # Add the schedule_id and job_id to the immediate_schedule_job table + self._insert_immediate_schedule_jobs(schedule_id, task_ids) + + self._conn.commit() + except (sqlite3.Error) as e: + self._rollback_transaction(str(e), "Transaction failed to create immediate schedule") + def _create_single_schedule(self, ss: SingleSchedule) -> None: # Add the schedule to the single_schedule table logger.debug( @@ -237,12 +446,8 @@ def _create_single_schedule(self, ss: SingleSchedule) -> None: start_time = None if not ss.start_time else str(ss.start_time) end_time = None if not ss.end_time else str(ss.end_time) try: - self._conn.execute('BEGIN') - self._cursor.execute(sql, (ss.request_id, start_time, end_time)) - schedule_id = self._cursor.lastrowid - if not schedule_id: - raise DispatcherException("No schedule id was added to the single_schedule table.") - + schedule_id = self._insert_schedule(sql, (ss.request_id, start_time, end_time)) + logger.debug( f"Added schedule with id: {str(schedule_id)}, request_id: {ss.request_id}, start_time: {start_time}, end_time: {ss.end_time}") @@ -254,10 +459,7 @@ def _create_single_schedule(self, ss: SingleSchedule) -> None: self._conn.commit() except (sqlite3.Error) as e: - self._conn.rollback() - self._cursor.close() - logger.error(f"Transaction failed: {str(e)}") - raise DispatcherException(f"Transaction failed: {str(e)}") + self._rollback_transaction(str(e), "Transaction failed to create single schedule") def _create_repeated_schedule(self, rs: RepeatedSchedule) -> None: # Add the schedule to the single_schedule table @@ -267,19 +469,14 @@ def _create_repeated_schedule(self, rs: RepeatedSchedule) -> None: sql = ''' INSERT INTO repeated_schedule(request_id, cron_duration, cron_minutes, cron_hours, cron_day_month, cron_month, cron_day_week) VALUES(?,?,?,?,?,?,?); ''' try: - self._conn.execute('BEGIN') - self._cursor.execute(sql, (rs.request_id, - rs.cron_duration, - rs.cron_minutes, - rs.cron_hours, - rs.cron_day_month, - rs.cron_month, - rs.cron_day_week,)) - schedule_id = self._cursor.lastrowid - if not schedule_id: - raise DispatcherException( - "No schedule id was added to the repeated_schedule table.") - + schedule_id = self._insert_schedule(sql, (rs.request_id, + rs.cron_duration, + rs.cron_minutes, + rs.cron_hours, + rs.cron_day_month, + rs.cron_month, + rs.cron_day_week,)) + logger.debug(f"Added repeated schedule with id: {str(schedule_id)}, request_id:{rs.request_id}, job_id:{rs.job_id}, cron_duration: {rs.cron_duration}, cron_minutes: {rs.cron_minutes}, cron_hours: {rs.cron_hours}, cron_day_month: {rs.cron_day_month}, cron_month: {rs.cron_month}, cron_day_week: {rs.cron_day_week}") # noqa if rs.job_id: @@ -290,9 +487,7 @@ def _create_repeated_schedule(self, rs: RepeatedSchedule) -> None: self._conn.commit() except (sqlite3.Error) as e: - self._conn.rollback() - logger.error(f"Transaction failed: {str(e)}") - raise DispatcherException(f"Transaction failed: {str(e)}") + self._rollback_transaction(str(e), "Transaction failed to create repeated schedule") def _insert_job(self, job_id: str, manifests: list[str]) -> list[int]: # Add the job to the job table @@ -303,15 +498,21 @@ def _insert_job(self, job_id: str, manifests: list[str]) -> list[int]: task_ids: list[int] = [] for manifest in manifests: + logger.debug( + f"Execute -> INSERT INTO job(job_id, manifest) VALUES({job_id}{manifest})") + sql = ''' INSERT INTO job(job_id, manifest) VALUES(?,?); ''' - try: - self._cursor.execute(sql, (job_id, manifest)) + try: + cursor = self._conn.cursor() + cursor.execute(sql, (job_id, manifest)) except (sqlite3.Error) as e: logger.error(f"Error inserting job into JOB table: {e}") raise DispatcherException(f"Error inserting job into JOB table: {e}") + finally: + cursor.close() - task_id = self._cursor.lastrowid + task_id = cursor.lastrowid if not task_id: raise DispatcherException("No task_id was added to the JOB table.") @@ -323,6 +524,24 @@ def _insert_job(self, job_id: str, manifests: list[str]) -> list[int]: return task_ids + def _insert_immediate_schedule_jobs(self, schedule_id: int, task_ids: list[int]) -> None: + # Add the priority, schedule_id, job_id to the join table + for task_id in task_ids: + priority = task_ids.index(task_id) + logger.debug( + f"Execute -> INSERT INTO immediate_schedule_job(priority, schedule_id, task_id) VALUES({priority}{schedule_id}{task_id})") + + sql = ''' INSERT INTO immediate_schedule_job(priority, schedule_id, task_id) VALUES(?,?,?); ''' + try: + cursor = self._conn.cursor() + cursor.execute(sql, (priority, schedule_id, task_id)) + except (sqlite3.IntegrityError, sqlite3.InternalError, sqlite3.OperationalError) as e: + raise DispatcherException(f"Error inserting into immediate_schedule_job table: {e}") + finally: + cursor.close() + logger.debug( + f"Inserted new tuple to immediate_schedule_job table with task_id: {str(task_id)} to schedule with id: {str(schedule_id)}, with priority: {str(priority)}") + def _insert_single_schedule_jobs(self, schedule_id: int, task_ids: list[int]) -> None: # Add the priority, schedule_id, job_id to the join table for task_id in task_ids: @@ -332,9 +551,13 @@ def _insert_single_schedule_jobs(self, schedule_id: int, task_ids: list[int]) -> sql = ''' INSERT INTO single_schedule_job(priority, schedule_id, task_id) VALUES(?,?,?); ''' try: - self._cursor.execute(sql, (priority, schedule_id, task_id)) + cursor = self._conn.cursor() + cursor.execute(sql, (priority, schedule_id, task_id)) except (sqlite3.IntegrityError, sqlite3.InternalError, sqlite3.OperationalError) as e: raise DispatcherException(f"Error inserting into single_schedule_job table: {e}") + finally: + cursor.close() + logger.debug( f"Inserted new tuple to single_schedule_job table with task_id: {str(task_id)} to schedule with id: {str(schedule_id)}, with priority: {str(priority)}") @@ -347,45 +570,39 @@ def _insert_repeated_schedule_job_tables(self, schedule_id: int, task_ids: list[ sql = ''' INSERT INTO repeated_schedule_job(priority, schedule_id, task_id) VALUES(?,?,?); ''' try: - self._cursor.execute(sql, (priority, schedule_id, task_id)) + cursor = self._conn.cursor() + cursor.execute(sql, (priority, schedule_id, task_id)) except (sqlite3.IntegrityError, sqlite3.InternalError, sqlite3.OperationalError) as e: raise DispatcherException( f"Error inserting new tuple to repeated_schedule_job table: {e}") + finally: + cursor.close() logger.debug( f"Inserted new tuple to repeated_schedule_job table with task_id: {str(task_id)} to schedule with id: {str(schedule_id)}, with priority: {str(priority)}") - def _select_manifests_by_schedule_id(self, sql: str, schedule_id: int) -> list[str]: - # Get the manifests from the join table - logger.debug(f"Execute -> {sql} with schedule_id={schedule_id}") - try: - self._cursor.execute(sql, (schedule_id,)) - except (sqlite3.IntegrityError, sqlite3.InternalError, sqlite3.OperationalError) as e: - raise DispatcherException(f"Error selecting schedule manifest from database: {e}") - - rows = self._cursor.fetchall() - if len(rows) == 0: - raise DispatcherException(f"No schedule manifest found with schedule_id: {schedule_id}") - - # cursor return [('MANIFEST1',), ('MANIFEST2',)]. - # Extract the strings from each tuple in the list - manifests = [item[0] for item in rows] - return manifests - def _create_tables_if_not_exist(self) -> None: + self._create_immediate_schedule_table() self._create_single_schedule_table() self._create_repeated_schedule_table() self._create_job_table() + self._create_immediate_schedule_job_table() self._create_single_schedule_job_table() - self._create_repeated_schedule_job_table() + self._create_repeated_schedule_job_table() + def _create_immediate_schedule_table(self) -> None: + sql = ''' CREATE TABLE IF NOT EXISTS immediate_schedule( + id INTEGER PRIMARY KEY AUTOINCREMENT, + request_id TEXT NOT NULL); ''' + self._execute_sql_statement(sql, (), "Error creating immediate_schedule table") + def _create_single_schedule_table(self) -> None: sql = ''' CREATE TABLE IF NOT EXISTS single_schedule( id INTEGER PRIMARY KEY AUTOINCREMENT, request_id TEXT NOT NULL, start_time TEXT NOT NULL, end_time TEXT); ''' - self._conn.execute(sql) + self._execute_sql_statement(sql, (), "Error creating single_schedule table") def _create_repeated_schedule_table(self) -> None: sql = ''' CREATE TABLE IF NOT EXISTS repeated_schedule( @@ -397,15 +614,26 @@ def _create_repeated_schedule_table(self) -> None: cron_day_month TEXT NOT NULL, cron_month TEXT NOT NULL, cron_day_week TEXT NOT NULL); ''' - self._conn.execute(sql) + self._execute_sql_statement(sql, (), "Error creating repeated_schedule table") def _create_job_table(self) -> None: sql = ''' CREATE TABLE IF NOT EXISTS job( task_id INTEGER PRIMARY KEY AUTOINCREMENT, job_id TEXT NOT NULL, manifest TEXT NOT NULL); ''' - self._conn.execute(sql) + self._execute_sql_statement(sql, (), "Error creating job table") + def _create_immediate_schedule_job_table(self) -> None: + sql = ''' CREATE TABLE IF NOT EXISTS immediate_schedule_job( + priority INTEGER NOT NULL, + schedule_id INTEGER NOT NULL, + task_id INTEGER NOT NULL, + status TEXT, + FOREIGN KEY(task_id) REFERENCES JOB(task_id), + FOREIGN KEY(schedule_id) REFERENCES IMMEDIATE_SCHEDULE(id), + PRIMARY KEY(schedule_id, task_id)); ''' + self._execute_sql_statement(sql, (), "Error creating immediate_schedule_job table") + def _create_single_schedule_job_table(self) -> None: sql = ''' CREATE TABLE IF NOT EXISTS single_schedule_job( priority INTEGER NOT NULL, @@ -413,9 +641,9 @@ def _create_single_schedule_job_table(self) -> None: task_id INTEGER NOT NULL, status TEXT, FOREIGN KEY(task_id) REFERENCES JOB(task_id), - FOREIGN KEY(schedule_id) REFERENCES REPEATED_SCHEDULE(id), + FOREIGN KEY(schedule_id) REFERENCES SINGLE_SCHEDULE(id), PRIMARY KEY(schedule_id, task_id)); ''' - self._conn.execute(sql) + self._execute_sql_statement(sql, (), "Error creating single_schedule_job table") def _create_repeated_schedule_job_table(self) -> None: sql = ''' CREATE TABLE IF NOT EXISTS repeated_schedule_job( @@ -426,4 +654,4 @@ def _create_repeated_schedule_job_table(self) -> None: FOREIGN KEY(task_id) REFERENCES JOB(task_id), FOREIGN KEY(schedule_id) REFERENCES REPEATED_SCHEDULE(id), PRIMARY KEY(schedule_id, task_id)); ''' - self._conn.execute(sql) + self._execute_sql_statement(sql, (), "Error creating repeated_schedule_job table") diff --git a/inbm/dispatcher-agent/dispatcher/workload_orchestration.py b/inbm/dispatcher-agent/dispatcher/workload_orchestration.py index 3c69eff12..f09ebc882 100644 --- a/inbm/dispatcher-agent/dispatcher/workload_orchestration.py +++ b/inbm/dispatcher-agent/dispatcher/workload_orchestration.py @@ -16,7 +16,6 @@ from os import path import requests -from inbm_common_lib.utility import get_canonical_representation_of_path from inbm_lib.count_down_latch import CountDownLatch from inbm_common_lib.shell_runner import PseudoShellRunner diff --git a/inbm/dispatcher-agent/tests/unit/common/mock_resources.py b/inbm/dispatcher-agent/tests/unit/common/mock_resources.py index c97c013ef..5c8c3d241 100644 --- a/inbm/dispatcher-agent/tests/unit/common/mock_resources.py +++ b/inbm/dispatcher-agent/tests/unit/common/mock_resources.py @@ -288,7 +288,7 @@ def __init__(self) -> None: def start(self, tls: bool) -> None: pass - def send_result(self, message: str, id: str = "") -> None: + def send_result(self, message: str, id: str = "", job_id: str = "") -> None: pass def mqtt_publish(self, topic: str, payload: Any, qos: int = 0, retain: bool = False) -> None: diff --git a/inbm/dispatcher-agent/tests/unit/fota/test_bios_factory.py b/inbm/dispatcher-agent/tests/unit/fota/test_bios_factory.py index 83a123924..93b8ec5f2 100644 --- a/inbm/dispatcher-agent/tests/unit/fota/test_bios_factory.py +++ b/inbm/dispatcher-agent/tests/unit/fota/test_bios_factory.py @@ -3,7 +3,7 @@ import mock from ..common.mock_resources import * from dispatcher.fota.bios_factory import * -from dispatcher.fota.bios_factory import extract_ext, LinuxToolFirmware, LinuxFileFirmware, BiosFactory +from dispatcher.fota.bios_factory import extract_ext, WindowsBiosNUC, LinuxToolFirmware, LinuxFileFirmware, BiosFactory from unittest.mock import patch from dispatcher.packagemanager.memory_repo import MemoryRepo @@ -30,10 +30,13 @@ def setUp(self) -> None: 'firmware_file_type': 'xx', 'guid': 'true'} self._nuc_dict = {'bios_vendor': 'Intel Corp.', 'operating_system': 'linux', 'firmware_tool': 'UpdateBIOS.sh', 'firmware_file_type': 'bio'} + self._nuc_windows_dict = {'bios_vendor': 'Intel Corp.', 'operating_system': 'windows', + 'firmware_tool': 'UpdateBIOS.sh', 'firmware_file_type': 'bio'} def test_get_factory_linux_tool_type(self) -> None: assert type(BiosFactory.get_factory("test", self._arm_dict, - self.mock_dispatcher_broker, MemoryRepo("test"))) \ + self.mock_dispatcher_broker, + MemoryRepo("test"))) \ is LinuxToolFirmware def test_get_factory_linux_file_type(self) -> None: @@ -41,6 +44,18 @@ def test_get_factory_linux_file_type(self) -> None: BiosFactory.get_factory("test", {'firmware_dest_path': 'abc'}, self.mock_dispatcher_broker, MemoryRepo("test"))) is LinuxFileFirmware + @patch('platform.system', return_value='Windows') + def test_get_factory_windows_bios_nuc(self, mock_os) -> None: + assert type( + BiosFactory.get_factory('NUC7i5DNKPC', self._nuc_windows_dict, + self.mock_dispatcher_broker, + MemoryRepo("test"))) is WindowsBiosNUC + + @patch('platform.system', return_value='Windows') + def test_raise_unsupported_windows_system(self, mock_os) -> None: + self.assertRaises(FotaError, BiosFactory.get_factory, 'NUC', self._nuc_dict, + self.mock_dispatcher_broker, MemoryRepo("test")) + @patch('inbm_common_lib.shell_runner.PseudoShellRunner.run') @patch('dispatcher.packagemanager.memory_repo.MemoryRepo.delete') @patch('dispatcher.fota.bios_factory.BiosFactory.unpack') diff --git a/inbm/dispatcher-agent/tests/unit/schedule/test_apscheduler.py b/inbm/dispatcher-agent/tests/unit/schedule/test_apscheduler.py index 24729745b..a4f930bec 100644 --- a/inbm/dispatcher-agent/tests/unit/schedule/test_apscheduler.py +++ b/inbm/dispatcher-agent/tests/unit/schedule/test_apscheduler.py @@ -1,7 +1,7 @@ from unittest.mock import Mock, patch from unittest import TestCase -from dispatcher.schedule.schedules import SingleSchedule, RepeatedSchedule +from dispatcher.schedule.schedules import SingleSchedule, RepeatedSchedule, Schedule from dispatcher.schedule.apscheduler import APScheduler from dispatcher.dispatcher_exception import DispatcherException from datetime import datetime, timedelta @@ -41,10 +41,13 @@ def test_return_true_schedule_in_future(self): manifests=["MANIFEST1", "MANIFEST2"]) self.assertTrue(self.scheduler.is_schedulable(ss1)) - def test_is_schedulable_with_other_object(self): - ss1 = "Neither a SingleSchedule nor a RepeatedSchedule object" - self.assertFalse(self.scheduler.is_schedulable(schedule=ss1)) - + def test_successfully_add_immediate_job(self): + s = Schedule(request_id="REQ123", + manifests=["MANIFEST1", "MANIFEST2"]) + self.scheduler.add_immediate_job(callback=Mock(), schedule=s) + self.assertEqual(len(self.scheduler. + _scheduler.get_jobs()), 2) + def test_successfully_add_single_schedule_job(self): ss1 = SingleSchedule(request_id="REQ123", start_time=datetime.now(), diff --git a/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py b/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py index 3e97f9e62..fbf8f1713 100644 --- a/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py +++ b/inbm/dispatcher-agent/tests/unit/schedule/test_sqlite_manager.py @@ -1,35 +1,164 @@ +import sqlite3 +from unittest.mock import MagicMock, patch import pytest from datetime import datetime from dispatcher.schedule.sqlite_manager import SqliteManager -from dispatcher.schedule.schedules import SingleSchedule, RepeatedSchedule +from dispatcher.schedule.schedules import Schedule, SingleSchedule, RepeatedSchedule from dispatcher.dispatcher_exception import DispatcherException +REQUEST_ID = "4324a262-b7d1-46a7-b8cc-84d934c3983f" +JOB_ID = "swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d" @pytest.fixture def db_connection(): # Setup: create a new in-memory database connection using the custom class db_conn = SqliteManager(":memory:") - + # Yield the custom database connection to the test yield db_conn # Teardown: close the connection after the test is done db_conn.close() +def test_rollback_called_on_insert_immediate_scheduled_job(db_connection: SqliteManager,): + s = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + manifests=["MANIFEST1", "MANIFEST2"]) + db_connection.clear_database() + + with patch.object(db_connection, '_insert_job', side_effect=sqlite3.Error("Mocked exception")): + with patch.object(db_connection, '_rollback_transaction', new_callable=MagicMock()) as mock_rollback: + db_connection.create_schedule(s) + mock_rollback.assert_called() + +def test_rollback_called_on_insert_single_scheduled_job(db_connection: SqliteManager,): + ss = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + start_time=datetime.strptime("2024-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), + end_time=datetime.strptime("2024-01-02T00:00:00", "%Y-%m-%dT%H:%M:%S"), + manifests=["MANIFEST1", "MANIFEST2"]) + db_connection.clear_database() + + with patch.object(db_connection, '_insert_job', side_effect=sqlite3.Error("Mocked exception")): + with patch.object(db_connection, '_rollback_transaction', new_callable=MagicMock()) as mock_rollback: + db_connection.create_schedule(ss) + mock_rollback.assert_called() + +def test_rollback_called_on_insert_repeated_scheduled_job(db_connection: SqliteManager,): + rs = RepeatedSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + cron_duration="P7D", cron_minutes="0", + cron_hours="0", cron_day_month="*", + cron_month="*", cron_day_week="1-5", + manifests=["MANIFEST1", "MANIFEST2"]) + db_connection.clear_database() + + with patch.object(db_connection, '_insert_job', side_effect=sqlite3.Error("Mocked exception")): + with patch.object(db_connection, '_rollback_transaction', new_callable=MagicMock()) as mock_rollback: + db_connection.create_schedule(rs) + mock_rollback.assert_called() + +def test_raises_sqlite_exception_on_get_immediate_schedules(db_connection: SqliteManager,): + with patch.object(db_connection, '_fetch_schedules', side_effect=sqlite3.Error("Mocked database error")): + with pytest.raises(DispatcherException) as excinfo: + schedules = db_connection.get_immediate_schedules_in_priority_order() + assert "Error in getting immediate schedules from database: Mocked database error" in str(excinfo.value) + +def test_raises_sqlite_exception_on_get_single_schedules(db_connection: SqliteManager,): + with patch.object(db_connection, '_fetch_schedules', side_effect=sqlite3.Error("Mocked database error")): + with pytest.raises(DispatcherException) as excinfo: + schedules = db_connection.get_single_schedules_in_priority_order() + assert "Error in getting single schedules from database: Mocked database error" in str(excinfo.value) + +def test_raises_sqlite_exception_on_get_repeated_schedules(db_connection: SqliteManager,): + with patch.object(db_connection, '_fetch_schedules', side_effect=sqlite3.Error("Mocked database error")): + with pytest.raises(DispatcherException) as excinfo: + schedules = db_connection.get_repeated_schedules_in_priority_order() + assert "Error in getting repeated schedules from database: Mocked database error" in str(excinfo.value) + +@pytest.mark.parametrize("status, expected_schedule", [ + # Success - no started jobs + ("scheduled", None), + # Success - one started job + ("started", SingleSchedule(request_id=REQUEST_ID, job_id=JOB_ID, task_id=1, schedule_id=1)), + ]) + +def test_return_schedule_for_immediate_scheduled_job(db_connection: SqliteManager, status, expected_schedule): + s = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + manifests=["MANIFEST1"]) + db_connection.clear_database() + + db_connection.create_schedule(s) + s.task_id = 1 + s.schedule_id = 1 + + db_connection.update_status(s, status) + schedule = db_connection.get_any_started_schedule() + + assert schedule == expected_schedule + +def test_update_single_schedule_status_to_scheduled(db_connection: SqliteManager): + ss = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + start_time=datetime.strptime("2024-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), + end_time=datetime.strptime("2024-01-02T00:00:00", "%Y-%m-%dT%H:%M:%S"), + manifests=["MANIFEST1", "MANIFEST2"]) + db_connection.clear_database() + + db_connection.create_schedule(ss) + # SQL call only gets results that don't have a status. + results = db_connection.get_single_schedules_in_priority_order() + assert len(results) == 2 + db_connection.update_status(results[0], "scheduled") + db_connection.update_status(results[1], "scheduled") + results = db_connection.get_single_schedules_in_priority_order() + assert len(results) == 0 + +def test_update_repeated_schedule_statu_to_scheduled(db_connection: SqliteManager): + rs = RepeatedSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + cron_duration="P7D", cron_minutes="0", + cron_hours="0", cron_day_month="*", + cron_month="*", cron_day_week="1-5", + manifests=["MANIFEST1", "MANIFEST2"]) + db_connection.clear_database() + + db_connection.create_schedule(rs) + # SQL call only gets results that don't have a status. + results = db_connection.get_repeated_schedules_in_priority_order() + assert len(results) == 2 + db_connection.update_status(results[0], "scheduled") + db_connection.update_status(results[1], "scheduled") + results = db_connection.get_repeated_schedules_in_priority_order() + assert len(results) == 0 + +def test_update_immediate_schedule_status_to_scheduled(db_connection: SqliteManager): + s = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + manifests=["MANIFEST1", "MANIFEST2"]) + db_connection.clear_database() + + db_connection.create_schedule(s) + # SQL call only gets results that don't have a status. + results = db_connection.get_immediate_schedules_in_priority_order() + assert len(results) == 2 + db_connection.update_status(results[0], "scheduled") + db_connection.update_status(results[1], "scheduled") + results = db_connection.get_immediate_schedules_in_priority_order() + assert len(results) == 0 + @pytest.mark.parametrize("start_time, end_time, manifests, expected_exception, exception_text", [ # Success (datetime.strptime("2024-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), datetime.strptime("2024-01-02T00:00:00", "%Y-%m-%dT%H:%M:%S"), ["MANIFEST1", "MANIFEST2"], None, None), - # # Success - no end time + # Success - no end time (datetime.strptime("2024-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), None, ["MANIFEST1", "MANIFEST2"], None, None), - # Fail - no start time - (None, datetime.strptime("2024-01-02T00:00:00", "%Y-%m-%dT%H:%M:%S"), - ["MANIFEST1"], DispatcherException, "Transaction failed: NOT NULL constraint failed: single_schedule.start_time"), # Fail - no manifests (datetime.strptime("2024-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S"), datetime.strptime("2024-01-02T00:00:00", "%Y-%m-%dT%H:%M:%S"), @@ -39,8 +168,8 @@ def db_connection(): def test_create_single_schedule_with_various_parameters(db_connection: SqliteManager, start_time, end_time, manifests, expected_exception, exception_text): - ss = SingleSchedule(request_id="4324a262-b7d1-46a7-b8cc-84d934c3983f", - job_id="swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d", + ss = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, start_time=start_time, end_time=end_time, manifests=manifests) @@ -51,15 +180,40 @@ def test_create_single_schedule_with_various_parameters(db_connection: SqliteMan assert exception_text in str(excinfo.value) else: db_connection.create_schedule(ss) - results = db_connection.get_all_single_schedules_in_priority_order() + results = db_connection.get_single_schedules_in_priority_order() assert len(results) == 2 for result in results: - assert result.request_id == "4324a262-b7d1-46a7-b8cc-84d934c3983f" - assert result.job_id == "swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d" + assert result.request_id == REQUEST_ID + assert result.job_id == JOB_ID assert result.start_time == start_time assert result.end_time == end_time assert result.manifests[0] in manifests +@pytest.mark.parametrize("manifests, expected_exception, exception_text", [ + # Success + (["MANIFEST1", "MANIFEST2"], None, None), + # Fail - missing manifests + ([], DispatcherException, "Error: At least one manifest is required for the schedule. Manifest list is empty."), + ]) + +def test_create_immediate_schedule_with_various_parameters(db_connection: SqliteManager, manifests, expected_exception, exception_text): + s = SingleSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, + manifests=manifests) + db_connection.clear_database() + if expected_exception: + with pytest.raises(expected_exception) as excinfo: + db_connection.create_schedule(s) + assert exception_text in str(excinfo.value) + else: + db_connection.create_schedule(s) + results = db_connection.get_immediate_schedules_in_priority_order() + assert len(results) == 2 + for result in results: + assert result.request_id == REQUEST_ID + assert result.job_id == JOB_ID + assert result.manifests[0] in manifests + @pytest.mark.parametrize("duration, minutes, hours, day_month, month, day_week, manifests, expected_exception, exception_text", [ # Success ("P7D", "0", "0", "*", "*", "1-5", ["MANIFEST1", "MANIFEST2"], None, None), @@ -71,8 +225,8 @@ def test_create_repeated_schedule_with_various_paramters(db_connection: SqliteMa duration, minutes, hours, day_month, month, day_week, manifests, expected_exception, exception_text): - rs = RepeatedSchedule(request_id="4324a262-b7d1-46a7-b8cc-84d934c3983f", - job_id="swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d", + rs = RepeatedSchedule(request_id=REQUEST_ID, + job_id=JOB_ID, cron_duration=duration, cron_minutes=minutes, cron_hours=hours, cron_day_month=day_month, cron_month=month, cron_day_week=day_week, @@ -84,11 +238,11 @@ def test_create_repeated_schedule_with_various_paramters(db_connection: SqliteMa assert exception_text in str(excinfo.value) else: db_connection.create_schedule(rs) - results = db_connection.get_all_repeated_schedules_in_priority_order() + results = db_connection.get_repeated_schedules_in_priority_order() assert len(results) == 2 for result in results: - assert result.request_id == "4324a262-b7d1-46a7-b8cc-84d934c3983f" - assert result.job_id == "swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d" + assert result.request_id == REQUEST_ID + assert result.job_id == JOB_ID assert result.cron_duration == duration assert result.cron_minutes == minutes assert result.cron_hours == hours diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py b/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py index 7e662c9d3..0dae90952 100644 --- a/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py +++ b/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py @@ -7,6 +7,7 @@ from dispatcher.sota.os_updater import DebianBasedUpdater from dispatcher.sota.sota import SOTA from dispatcher.packagemanager.memory_repo import MemoryRepo +from dispatcher.sota.command_list import CommandList from dispatcher.sota.constants import * from unittest.mock import patch from inbm_lib.xmlhandler import XmlHandler @@ -58,6 +59,23 @@ def setUpClass(cls) -> None: MockInstallCheckService(), snapshot=1) + def test_create_no_download_cmd_with_no_package_list(self) -> None: + expectedCmd = CommandList(["dpkg --configure -a --force-confdef --force-confold", + "apt-get -o Dpkg::Options::='--force-confdef' -o Dpkg::Options::='--force-confold' -yq -f install", + "apt-get -o Dpkg::Options::='--force-confdef' -o Dpkg::Options::='--force-confold' --with-new-pkgs --no-download --fix-missing -yq upgrade"]).cmd_list + output = DebianBasedUpdater(package_list=[]).no_download() + + assert str(output) == str(expectedCmd) + + def test_create_no_download_cmd_with_package_list(self) -> None: + expectedCmd = CommandList(["dpkg --configure -a --force-confdef --force-confold", + "apt-get -o Dpkg::Options::='--force-confdef' -o Dpkg::Options::='--force-confold' -yq -f install", + "apt-get -o Dpkg::Options::='--force-confdef' -o Dpkg::Options::='--force-confold' --no-download --fix-missing -yq install ubuntu"]).cmd_list + output = DebianBasedUpdater(package_list=['ubuntu']).no_download() + + assert str(output) == str(expectedCmd) + + def test_Ubuntu_update(self) -> None: assert TestOsUpdater.sota_instance TestOsUpdater.sota_instance.factory = SotaOsFactory( diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_pytest_style.py b/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_pytest_style.py index e62eacd67..1161d84dd 100644 --- a/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_pytest_style.py +++ b/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_pytest_style.py @@ -5,7 +5,6 @@ # Test when '-install' argument is present in the help output - def test_mender_install_argument_present(mocker) -> None: mocked_run = mocker.patch('dispatcher.sota.os_updater.PseudoShellRunner.run') mocked_run.return_value = ("Usage of the command with -install option", "", 0) diff --git a/inbm/dispatcher-agent/tests/unit/test_command.py b/inbm/dispatcher-agent/tests/unit/test_command.py index cf18d01c7..646211f0e 100644 --- a/inbm/dispatcher-agent/tests/unit/test_command.py +++ b/inbm/dispatcher-agent/tests/unit/test_command.py @@ -39,6 +39,5 @@ def test_create_payload(self) -> None: self.assertEqual(payload['id'], '12345') self.assertEqual(payload['size'], 100) - if __name__ == '__main__': unittest.main() diff --git a/inbm/dispatcher-agent/tests/unit/test_configuration_helper.py b/inbm/dispatcher-agent/tests/unit/test_configuration_helper.py index 2342d3742..1eb2e2a83 100644 --- a/inbm/dispatcher-agent/tests/unit/test_configuration_helper.py +++ b/inbm/dispatcher-agent/tests/unit/test_configuration_helper.py @@ -62,7 +62,7 @@ def test_file_download_fetch_fails(setup_xml_handlers, mocker) -> None: mock_source = mocker.patch('dispatcher.configuration_helper.verify_source') with pytest.raises(DispatcherException, match="Configuration File Fetch Failed: {\"status\": 400, " - "\"message\": \"FAILED TO INSTALL\"}"): + "\"message\": \"FAILED TO INSTALL\", \"job_id\": \"\"}"): ConfigurationHelper(mock_dispatcher_broker).download_config( good, memory_repo.MemoryRepo("")) @@ -75,7 +75,7 @@ def test_file_download_xml_fails(setup_xml_handlers, mocker) -> None: mock_source = mocker.patch('dispatcher.configuration_helper.verify_source') with pytest.raises(DispatcherException, match="Configuration File Fetch Failed: {\"status\": 404, " - "\"message\": \"Not Found\"}"): + "\"message\": \"Not Found\", \"job_id\": \"\"}"): ConfigurationHelper(mock_dispatcher_broker).download_config( good, memory_repo.MemoryRepo("")) diff --git a/inbm/dispatcher-agent/tests/unit/test_dispatcher.py b/inbm/dispatcher-agent/tests/unit/test_dispatcher.py index ca79ff8a6..94ff7fea9 100644 --- a/inbm/dispatcher-agent/tests/unit/test_dispatcher.py +++ b/inbm/dispatcher-agent/tests/unit/test_dispatcher.py @@ -168,9 +168,9 @@ def test_do_install_ota_error_result_succeeds(self, mock_workload_orchestration_ d._send_result = Mock() # type: ignore[method-assign] d.do_install("") args, _ = d._send_result.call_args - result, = args + #result = args[0] - assert "400" in result + #assert "400" in result @patch('inbm_lib.xmlhandler.XmlHandler', autospec=True) def test_do_install_pota_do_ota_func_called(self, MockXmlHandler) -> None: @@ -343,7 +343,8 @@ def test_do_install_can_call_do_source_command(self, mock_workload_orchestration_func.assert_called() mock_do_source_command.assert_called_once() - def test_abc(self, ): + @patch('dispatcher.schedule.sqlite_manager.SqliteManager.get_any_started_schedule', return_value=None) + def test_abc(self, mock_job_id): xml = """\ @@ -457,20 +458,16 @@ def test_config_get_element_pass(self, mock_request_config_agent.return_value = True self.assertEqual(200, d.do_install(xml=xml, schema_location=TEST_SCHEMA_LOCATION).status) + @patch('dispatcher.schedule.sqlite_manager.SqliteManager.get_any_started_schedule', return_value=None) @patch('dispatcher.schedule.sqlite_manager.SqliteManager.__init__', return_value=None) @patch('inbm_lib.mqttclient.mqtt.mqtt.Client.connect') @patch('inbm_lib.mqttclient.mqtt.mqtt.Client.subscribe') - def test_service_name_prefixed_inbm(self, - m_sub: Any, - m_connect: Any, - sql_mgr: Any, - ) -> None: - + def test_service_name_prefixed_inbm(self, m_sub, m_connect, sql_mgr, mock_job_id) -> None: d = WindowsDispatcherService([]) self.assertFalse(' ' in d._svc_name_) self.assertEqual(d._svc_name_.split('-')[0], 'inbm') - @staticmethod + @staticmethod @patch('dispatcher.schedule.sqlite_manager.SqliteManager.__init__', return_value=None) def _build_dispatcher(sqlite_mgr, install_check: InstallCheckService = MockInstallCheckService()) -> Dispatcher: d = Dispatcher([], MockDispatcherBroker.build_mock_dispatcher_broker(), diff --git a/inbm/dispatcher-agent/tests/unit/test_dispatcher_class.py b/inbm/dispatcher-agent/tests/unit/test_dispatcher_class.py deleted file mode 100644 index 881482c60..000000000 --- a/inbm/dispatcher-agent/tests/unit/test_dispatcher_class.py +++ /dev/null @@ -1,121 +0,0 @@ -import pytest -import os - -from unit.common.mock_resources import * -from dispatcher.dispatcher_class import handle_updates - -GOOD_IMMEDIATE_SCHEDULE_XML = """ - - 6bf587ac-1d70-4e21-9a15-097f6292b9c4 - - - - swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d - - - - ota
sotaremote
updatefullno -
]]>
-
-
-
""" - -GOOD_SEVERAL_IMMEDIATE_SCHEDULE_XML = """ - - c9b74125-f3bb-440a-ad80-8d02090bd337 - - - - swupd-939fe48c-32da-40eb-a00f-acfdb43a5d6d - - - - ota
sotaremote
updatefullno -
]]>
-
-
- - - - swupd-88fff0ef-4fae-43a5-beb7-fe7d8d5e31cd - 2021-09-01T00:00:00 - - - - ota
sotaremote
updatefullno -
]]>
-
-
- - - - swupd-dea8fac4-dbdb-400e-ba11-4dd3c07ad270 - - - - ota
sotaremote
updatefullno -
]]>
-
-
-
""" - -SCHEDULE_SCHEMA_LOCATION = os.path.join( - os.path.dirname(__file__), - '..', - '..', - 'fpm-template', - 'usr', - 'share', - 'dispatcher-agent', - 'schedule_manifest_schema.xsd', -) - -EMBEDDED_SCHEMA_LOCATION = os.path.join( - os.path.dirname(__file__), - '..', - '..', - 'fpm-template', - 'usr', - 'share', - 'dispatcher-agent', - 'manifest_schema.xsd', -) - - -@pytest.fixture -def mock_disp_obj(): - return MockDispatcher.build_mock_dispatcher() - - -@pytest.fixture -def method_counter(mocker): - mock_method = mocker.patch.object(MockDispatcher, 'do_install') - yield mock_method - - -def test_run_one_immediate_scheduled_manifest(mock_disp_obj, method_counter, mocker): - # Mock the call to dispatcher.update_queue.get - mocker.patch.object(mock_disp_obj.update_queue, 'get', - return_value=['schedule', GOOD_IMMEDIATE_SCHEDULE_XML, "REQ12345"]) - - handle_updates(mock_disp_obj, - schedule_manifest_schema=SCHEDULE_SCHEMA_LOCATION, - manifest_schema=EMBEDDED_SCHEMA_LOCATION) - - # Assert that the do_install method is called once - assert method_counter.call_count == 1 - - -def test_run_several_immediate_scheduled_manifest(mock_disp_obj, method_counter, mocker): - mocker.patch('dispatcher.schedule.sqlite_manager.SqliteManager.create_schedule') - mocker.patch('dispatcher.schedule.sqlite_manager.SqliteManager.__init__', return_value=None) - # Mock the call to dispatcher.update_queue.get - mocker.patch.object(mock_disp_obj.update_queue, 'get', - return_value=['schedule', GOOD_SEVERAL_IMMEDIATE_SCHEDULE_XML, "REQ12345"]) - - handle_updates(mock_disp_obj, - schedule_manifest_schema=SCHEDULE_SCHEMA_LOCATION, - manifest_schema=EMBEDDED_SCHEMA_LOCATION) - - # Assert that the do_install method is called the correct number of times - assert method_counter.call_count == 2 diff --git a/inbm/dockerfiles/Dockerfile-check.m4 b/inbm/dockerfiles/Dockerfile-check.m4 index 5128e49e0..0245ecb2a 100644 --- a/inbm/dockerfiles/Dockerfile-check.m4 +++ b/inbm/dockerfiles/Dockerfile-check.m4 @@ -27,6 +27,7 @@ RUN source /venv-py3/bin/activate && \ types-requests==2.31.0.1 \ types-protobuf==5.26.0.20240422 \ pytest==7.4.3 \ + pytest-timeout==2.3.1 \ pytest-cov==4.1.0 \ pytest-mock==3.12.0 \ pytest-xdist==3.3.1 \ @@ -63,8 +64,8 @@ RUN source /venv-py3/bin/activate && \ set -o pipefail && \ mkdir -p /output/coverage && \ cd tests/unit && \ - pytest -n 1 --cov=inbm_common_lib --cov-report=term-missing --cov-fail-under=82 inbm_common_lib 2>&1 | tee /output/coverage/inbm-common-lib-coverage.txt && \ - pytest -n 1 --cov=inbm_lib --cov-report=term-missing --cov-fail-under=82 inbm_lib 2>&1 | tee /output/coverage/inbm-lib-coverage.txt && \ + pytest --timeout=10 -n 1 --cov=inbm_common_lib --cov-report=term-missing --cov-fail-under=82 inbm_common_lib 2>&1 | tee /output/coverage/inbm-common-lib-coverage.txt && \ + pytest --timeout=10 -n 1 --cov=inbm_lib --cov-report=term-missing --cov-fail-under=82 inbm_lib 2>&1 | tee /output/coverage/inbm-lib-coverage.txt && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ touch /passed.txt @@ -92,7 +93,7 @@ RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ - pytest -n 1 --cov=inbc --cov-report=term-missing --cov-fail-under=84 tests/unit 2>&1 | tee /output/coverage/inbc-coverage.txt + pytest --timeout=10 -n 1 --cov=inbc --cov-report=term-missing --cov-fail-under=84 tests/unit 2>&1 | tee /output/coverage/inbc-coverage.txt # ---diagnostic agent--- @@ -118,7 +119,7 @@ RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ - pytest -n 1 --cov=diagnostic --cov-report=term-missing --cov-fail-under=80 tests/unit 2>&1 | tee /output/coverage/diagnostic-coverage.txt + pytest --timeout=10 -n 1 --cov=diagnostic --cov-report=term-missing --cov-fail-under=80 tests/unit 2>&1 | tee /output/coverage/diagnostic-coverage.txt # ---dispatcher agent--- @@ -148,7 +149,7 @@ RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ - pytest -n 3 --cov=dispatcher --cov-report=term-missing --cov-fail-under=81 tests/unit 2>&1 | tee /output/coverage/dispatcher-coverage.txt + pytest --timeout=10 -n 3 --cov=dispatcher --cov-report=term-missing --cov-fail-under=81 tests/unit 2>&1 | tee /output/coverage/dispatcher-coverage.txt # ---cloudadapter agent--- @@ -174,7 +175,7 @@ RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ - pytest -n 10 --cov=cloudadapter --cov-report=term-missing --cov-fail-under=90 tests/unit 2>&1 | tee /output/coverage/cloudadapter-coverage.txt + pytest --timeout=10 -n 10 --cov=cloudadapter --cov-report=term-missing --cov-fail-under=89 tests/unit 2>&1 | tee /output/coverage/cloudadapter-coverage.txt # ---telemetry agent--- @@ -200,7 +201,7 @@ RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ - pytest -n 1 --cov=telemetry --cov-report=term-missing --cov-fail-under=83 telemetry/tests/unit 2>&1 | tee /output/coverage/telemetry-coverage.txt + pytest --timeout=10 -n 1 --cov=telemetry --cov-report=term-missing --cov-fail-under=83 telemetry/tests/unit 2>&1 | tee /output/coverage/telemetry-coverage.txt # ---configuration agent--- @@ -226,7 +227,7 @@ RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ export PYTHONPATH=$PYTHONPATH:$(pwd) && \ - pytest -n 1 --cov=configuration --cov-report=term-missing --cov-fail-under=88 configuration/tests/unit 2>&1 | tee /output/coverage/configuration-coverage.txt + pytest --timeout=10 -n 1 --cov=configuration --cov-report=term-missing --cov-fail-under=88 configuration/tests/unit 2>&1 | tee /output/coverage/configuration-coverage.txt # output container FROM base AS output diff --git a/inbm/dockerfiles/image.main.m4 b/inbm/dockerfiles/image.main.m4 index e7f1993cd..b0b58d8d7 100644 --- a/inbm/dockerfiles/image.main.m4 +++ b/inbm/dockerfiles/image.main.m4 @@ -2,14 +2,14 @@ # SPDX-License-Identifier: Apache-2.0 # base image with all dependencies for building -FROM registry.hub.docker.com/library/ubuntu:20.04 as base +FROM registry.hub.docker.com/library/ubuntu:20.04 AS base RUN echo 'force cache refresh 20240212' include(`commands.base-setup.m4') # build a virtual environment for each agent to build from # py3 venv -FROM base as venv-py3-x86_64 +FROM base AS venv-py3-x86_64 WORKDIR / RUN python3.12 -m venv /venv-py3 && \ source /venv-py3/bin/activate && \ @@ -24,7 +24,7 @@ RUN rm /usr/lib/x86_64-linux-gnu/libreadline* # extra protection against libread # ---inbc-program--- -FROM venv-py3-x86_64 as venv-inbc-py3 +FROM venv-py3-x86_64 AS venv-inbc-py3 COPY inbc-program/requirements.txt /src/inbc-program/requirements.txt WORKDIR /src/inbc-program RUN source /venv-py3/bin/activate && \ @@ -35,7 +35,7 @@ COPY inbm/packaging /src/packaging COPY inbc-program /src/inbc-program COPY inbm-lib/ /src/inbm-lib/ -FROM venv-inbc-py3 as inbc-py3 +FROM venv-inbc-py3 AS inbc-py3 RUN source /venv-py3/bin/activate && \ mkdir -p /output && \ set -o pipefail && \ @@ -44,7 +44,7 @@ RUN source /venv-py3/bin/activate && \ # ---diagnostic agent--- -FROM venv-py3-x86_64 as venv-diagnostic-py3 +FROM venv-py3-x86_64 AS venv-diagnostic-py3 COPY inbm/diagnostic-agent/requirements.txt /src/diagnostic-agent/requirements.txt WORKDIR /src/diagnostic-agent RUN source /venv-py3/bin/activate && \ @@ -54,7 +54,7 @@ COPY inbm/packaging /src/packaging COPY inbm/diagnostic-agent /src/diagnostic-agent COPY inbm-lib/ /src/inbm-lib/ -FROM venv-diagnostic-py3 as diagnostic-py3 +FROM venv-diagnostic-py3 AS diagnostic-py3 RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ @@ -64,7 +64,7 @@ RUN source /venv-py3/bin/activate && \ # ---dispatcher agent--- -FROM venv-py3-x86_64 as venv-dispatcher-py3 +FROM venv-py3-x86_64 AS venv-dispatcher-py3 COPY inbm/dispatcher-agent/requirements.txt /src/dispatcher-agent/requirements.txt WORKDIR /src/dispatcher-agent RUN source /venv-py3/bin/activate && \ @@ -82,7 +82,7 @@ ARG COMMIT RUN mkdir -p /src/dispatcher-agent/fpm-template/usr/share/intel-manageability/ && \ ( echo "Version: ${VERSION}" && echo "Commit: ${COMMIT}" ) >/src/dispatcher-agent/fpm-template/usr/share/intel-manageability/inbm-version.txt -FROM venv-dispatcher-py3 as dispatcher-py3 +FROM venv-dispatcher-py3 AS dispatcher-py3 RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ @@ -92,7 +92,7 @@ RUN source /venv-py3/bin/activate && \ # ---cloudadapter agent--- -FROM venv-py3-x86_64 as venv-cloudadapter-py3 +FROM venv-py3-x86_64 AS venv-cloudadapter-py3 COPY inbm/cloudadapter-agent/requirements.txt /src/cloudadapter-agent/requirements.txt WORKDIR /src/cloudadapter-agent RUN source /venv-py3/bin/activate && \ @@ -103,7 +103,7 @@ COPY inbm/packaging /src/packaging COPY inbm/cloudadapter-agent /src/cloudadapter-agent COPY inbm-lib/ /src/inbm-lib/ -FROM venv-cloudadapter-py3 as cloudadapter-py3 +FROM venv-cloudadapter-py3 AS cloudadapter-py3 RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ @@ -113,7 +113,7 @@ RUN source /venv-py3/bin/activate && \ # ---telemetry agent--- -FROM venv-py3-x86_64 as venv-telemetry-py3 +FROM venv-py3-x86_64 AS venv-telemetry-py3 COPY inbm/telemetry-agent/requirements.txt /src/telemetry-agent/requirements.txt WORKDIR /src/telemetry-agent RUN source /venv-py3/bin/activate && \ @@ -123,7 +123,7 @@ COPY inbm/packaging /src/packaging COPY inbm/telemetry-agent /src/telemetry-agent COPY inbm-lib/ /src/inbm-lib/ -FROM venv-telemetry-py3 as telemetry-py3 +FROM venv-telemetry-py3 AS telemetry-py3 RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ @@ -132,7 +132,7 @@ RUN source /venv-py3/bin/activate && \ # ---configuration agent--- -FROM venv-py3-x86_64 as venv-configuration-py3 +FROM venv-py3-x86_64 AS venv-configuration-py3 COPY inbm/configuration-agent/requirements.txt /src/configuration-agent/requirements.txt WORKDIR /src/configuration-agent RUN source /venv-py3/bin/activate && \ @@ -142,7 +142,7 @@ COPY inbm/packaging /src/packaging COPY inbm/configuration-agent /src/configuration-agent COPY inbm-lib/ /src/inbm-lib/ -FROM venv-configuration-py3 as configuration-py3 +FROM venv-configuration-py3 AS configuration-py3 RUN source /venv-py3/bin/activate && \ mkdir -p /output/coverage && \ set -o pipefail && \ @@ -153,7 +153,7 @@ RUN source /venv-py3/bin/activate && \ # ---trtl--- -FROM registry.hub.docker.com/library/golang:1.22-bookworm as trtl-build +FROM registry.hub.docker.com/library/golang:1.22-bookworm AS trtl-build WORKDIR / ENV GOPATH /build/go ENV PATH $PATH:$GOROOT/bin:$GOPATH/bin @@ -172,7 +172,7 @@ RUN go get -t github.com/stretchr/testify/assert RUN scripts/test-trtl COPY inbm/version.txt /build/go/src/iotg-inb/version.txt -FROM base as trtl-package +FROM base AS trtl-package COPY --from=trtl-build /build /build WORKDIR /build/go/src/iotg-inb/trtl RUN scripts/package-trtl deb EVAL @@ -181,25 +181,25 @@ RUN rm -rf /output/ && mv ./output/ /output/ # --inb-provision-certs- -FROM registry.hub.docker.com/library/golang:1.22-bookworm as inb-provision-certs +FROM registry.hub.docker.com/library/golang:1.22-bookworm AS inb-provision-certs COPY inbm/fpm/inb-provision-certs /inb-provision-certs RUN cd /inb-provision-certs && CGO_ENABLED=0 go build -trimpath -mod=readonly -gcflags="all=-spectre=all -N -l" -asmflags="all=-spectre=all" -ldflags="all=-s -w" . && rm -rf /output/ && mkdir /output && cp /inb-provision-certs/inb-provision-certs /output/inb-provision-certs # --inb-provision-cloud- -FROM registry.hub.docker.com/library/golang:1.22-bookworm as inb-provision-cloud +FROM registry.hub.docker.com/library/golang:1.22-bookworm AS inb-provision-cloud COPY inbm/fpm/inb-provision-cloud /inb-provision-cloud RUN cd /inb-provision-cloud && go test . && CGO_ENABLED=0 go build -trimpath -mod=readonly -gcflags="all=-spectre=all -N -l" -asmflags="all=-spectre=all" -ldflags="all=-s -w" . && rm -rf /output/ && mkdir /output && cp /inb-provision-cloud/inb-provision-cloud /output/inb-provision-cloud # --inb-provision-ota-cert- -FROM registry.hub.docker.com/library/golang:1.22-bookworm as inb-provision-ota-cert +FROM registry.hub.docker.com/library/golang:1.22-bookworm AS inb-provision-ota-cert COPY inbm/fpm/inb-provision-ota-cert /inb-provision-ota-cert RUN cd /inb-provision-ota-cert && CGO_ENABLED=0 go build -trimpath -mod=readonly -gcflags="all=-spectre=all -N -l" -asmflags="all=-spectre=all" -ldflags="all=-s -w" . && rm -rf /output/ && mkdir /output && cp /inb-provision-ota-cert/inb-provision-ota-cert /output/inb-provision-ota-cert # --packaging-- -FROM base as packaging +FROM base AS packaging COPY inbm/packaging /src/packaging WORKDIR /src/packaging RUN rm -rf output/ && \ @@ -216,7 +216,7 @@ RUN mkdir -p output/certs && \ WORKDIR /src/packaging RUN cp -v docker-sample-container/docker/certs/succeed_rpm_key.pem /output -FROM base as fpm +FROM base AS fpm WORKDIR / RUN wget https://github.com/certifi/python-certifi/archive/refs/tags/2023.07.22.zip -O python-certifi-src-2023.07.22.zip COPY inbm/fpm /src/fpm @@ -240,7 +240,7 @@ RUN make build && \ mv output/* /output # output container -FROM registry.hub.docker.com/library/ubuntu:20.04 as output-main +FROM registry.hub.docker.com/library/ubuntu:20.04 AS output-main COPY --from=packaging /output /packaging COPY --from=inbc-py3 /output /inbc COPY --from=diagnostic-py3 /output /diagnostic diff --git a/inbm/fpm/mqtt/template/etc/intel-manageability/public/mqtt-broker/acl.file b/inbm/fpm/mqtt/template/etc/intel-manageability/public/mqtt-broker/acl.file index bc506331f..962842ab3 100644 --- a/inbm/fpm/mqtt/template/etc/intel-manageability/public/mqtt-broker/acl.file +++ b/inbm/fpm/mqtt/template/etc/intel-manageability/public/mqtt-broker/acl.file @@ -12,6 +12,7 @@ topic write manageability/event topic write manageability/cmd/+ topic write manageability/response topic write manageability/response/+ +topic write manageability/update topic write ma/configuration/update/+ topic write dispatcher/query @@ -22,6 +23,7 @@ topic read manageability/response topic read manageability/response/+ topic read manageability/telemetry topic read manageability/event +topic read manageability/update user inbc-program topic write manageability/request/# @@ -53,6 +55,7 @@ user cmd-program topic write manageability/event topic write manageability/response topic write manageability/telemetry +topic write manageability/update topic read manageability/cmd/+ user ucc-native-service