diff --git a/inbc-program/inbc/parser/parser.py b/inbc-program/inbc/parser/parser.py index f20b4a7de..5679d1b46 100644 --- a/inbc-program/inbc/parser/parser.py +++ b/inbc-program/inbc/parser/parser.py @@ -224,7 +224,7 @@ def parse_sota_args(self) -> None: parser_sota.add_argument('--reboot', '-rb', default='yes', required=False, choices=['yes', 'no'], help='Type of information [ yes | no ]') parser_sota.add_argument('--mode', '-m', default='full', - required=False, choices=['full', 'download-only', 'no-download']) + required=False, choices=['full', 'download-only', 'no-download', 'cancel']) parser_sota.add_argument('--package-list', '-p', required=False, type=lambda x: validate_package_list(x), help='Comma-separated list of package names to install') diff --git a/inbm/Changelog.md b/inbm/Changelog.md index d794a6067..b663b08af 100644 --- a/inbm/Changelog.md +++ b/inbm/Changelog.md @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/). ## NEXT - MMMM-DD-YY +### Added + - (NEXMANAGE-737) Enable sota cancel mode + ### Fixed - (NEXMANAGE-872) Fix provision-tc issue in TiberOS - cannot overwrite /etc/dispatcher.environment - (NEXMANAGE-846) Fix granular log raise error when granular log file is empty diff --git a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py index 468f30779..d3b2c5386 100644 --- a/inbm/dispatcher-agent/dispatcher/dispatcher_class.py +++ b/inbm/dispatcher-agent/dispatcher/dispatcher_class.py @@ -12,7 +12,7 @@ import signal import sys from queue import Queue -from threading import Thread, active_count, Lock +from threading import Thread, active_count, Lock, Event from time import sleep from typing import Optional, Any, Mapping, Tuple, Sequence @@ -55,6 +55,7 @@ from .sota.os_factory import SotaOsFactory from .sota.sota import SOTA from .sota.sota_error import SotaError +from .sota.cancel import cancel_thread from .workload_orchestration import WorkloadOrchestration from inbm_lib.xmlhandler import * from inbm_lib.version import get_friendly_inbm_version_commit @@ -94,6 +95,8 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv # Initialize update_queue with a capacity of 1 to ensure serialized handling of updates. self.update_queue: Queue[Tuple[str, str, Optional[str]]] = Queue(1) self._thread_count = 1 + self._thread_list: list[Thread] = [] + self._active_thread_manifest: Optional[str] = None self._sota_repos = None self.sota_mode = None self._package_list: str = "" @@ -115,6 +118,7 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv self.sqlite_mgr = SqliteManager() self.ap_scheduler = APScheduler(sqlite_mgr=self.sqlite_mgr) + self._cancel_event = Event() def stop(self) -> None: self.RUNNING = False @@ -186,7 +190,13 @@ def _sig_handler(signo, frame) -> None: if active_count() - active_start_count < self._thread_count: worker = Thread(target=handle_updates, args=(self,)) worker.setDaemon(True) + self._thread_list.append(worker) worker.start() + + # Periodically check if processes have finished. If process finished, remove it from the list. + for thread in self._thread_list[:]: + if not thread.is_alive(): + self._thread_list.remove(thread) sleep(1) self._dispatcher_broker.mqtt_publish(f'{AGENT}/state', 'dead', retain=True) @@ -310,6 +320,8 @@ def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: st result: Result = Result() logger.debug("do_install") parsed_head = None + # Assumption is that there is only one active OTA thread at a time + self._active_thread_manifest = xml try: # TODO: Split into multiple try/except blocks type_of_manifest, parsed_head = \ _check_type_validate_manifest(xml, schema_location=schema_location) @@ -411,7 +423,8 @@ def _do_ota_update(self, ota_type: str, repo_type: str, resource: dict, self._sota_repos, self._install_check_service, self._update_logger, - self.config_dbs) + self.config_dbs, + self._cancel_event) p = factory.create_parser() # NOTE: p.parse can raise one of the *otaError exceptions @@ -440,7 +453,8 @@ def _validate_pota_manifest(self, repo_type: str, self._sota_repos, self._install_check_service, self._update_logger, - self.config_dbs) + self.config_dbs, + self._cancel_event) p = factory.create_parser() # NOTE: p.parse can raise one of the *otaError exceptions parsed_manifest = p.parse(ota_list[ota], kwargs, parsed_head) @@ -497,7 +511,32 @@ def _on_cloud_request(self, topic: str, payload: str, qos: int) -> None: request_type = topic.split('/')[2] request_id = topic.split('/')[3] if len(topic.split('/')) > 3 else None manifest = payload - self._add_request_to_queue(request_type, manifest, request_id) + if not self._handle_cancel_request(request_type, manifest): + self._add_request_to_queue(request_type, manifest, request_id) + + def _handle_cancel_request(self, request_type: str, manifest: str) -> bool: + """ + Check if it is a SOTA cancel request. If it is, send the terminate signal to current process. + + @param request_type: type of the request + @param manifest: manifest to be processed + @return: True if the request has been processed; False if no request has been handled. + """ + if request_type == "install": + type_of_manifest, parsed_head = \ + _check_type_validate_manifest(manifest) + type_of_active_manifest = active_thread_parsed_head = None + if self._active_thread_manifest: + type_of_active_manifest, active_thread_parsed_head = \ + _check_type_validate_manifest(self._active_thread_manifest) + result = cancel_thread(type_of_manifest, parsed_head, self._thread_list, + type_of_active_manifest, active_thread_parsed_head, + self._dispatcher_broker, self._cancel_event) + if result: + logger.debug(f"Request cancel complete.") + self._send_result(str(Result(CODE_OK, "Request complete."))) + return True + return False def _on_message(self, topic: str, payload: Any, qos: int) -> None: """Called when a message is received from _telemetry-agent @@ -619,6 +658,7 @@ def invoke_sota(self, snapshot: Optional[Any] = None, action: Optional[Any] = No self._update_logger, self._sota_repos, self._install_check_service, + self._cancel_event, snapshot, action) sota_instance.execute(self.proceed_without_rollback) diff --git a/inbm/dispatcher-agent/dispatcher/ota_factory.py b/inbm/dispatcher-agent/dispatcher/ota_factory.py index 879ab2936..f55c90e3d 100644 --- a/inbm/dispatcher-agent/dispatcher/ota_factory.py +++ b/inbm/dispatcher-agent/dispatcher/ota_factory.py @@ -6,6 +6,7 @@ """ import abc import logging +import threading from typing import Any, Optional, Mapping from .config_dbs import ConfigDbs @@ -53,7 +54,8 @@ def get_factory(ota_type, sota_repos: Optional[str], install_check_service: InstallCheckService, update_logger: UpdateLogger, - dbs: ConfigDbs) -> "OtaFactory": + dbs: ConfigDbs, + cancel_event: threading.Event) -> "OtaFactory": """Create an OTA factory of a specified OTA type @param ota_type: The OTA type @@ -64,6 +66,7 @@ def get_factory(ota_type, @param install_check_service: provides install_check @param update_logger: UpdateLogger (expected to update after OTA) @param dbs: ConfigDbs.ON or ConfigDbs.WARN or ConfigDbs.OFF + @param cancel_event: Event used to stop the downloading process @raise ValueError: Unsupported OTA type """ @@ -72,7 +75,7 @@ def get_factory(ota_type, return FotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger) if ota_type == OtaType.SOTA.name: return SotaFactory(repo_type, dispatcher_broker, proceed_without_rollback, - sota_repos, install_check_service, update_logger) + sota_repos, install_check_service, update_logger, cancel_event) if ota_type == OtaType.AOTA.name: return AotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger, dbs=dbs) if ota_type == OtaType.POTA.name: @@ -116,7 +119,8 @@ class SotaFactory(OtaFactory): @param proceed_without_rollback: Is it OK to run SOTA without rollback ability? @param install_check_service: provides InstallCheckService @param sota_repos: new Ubuntu/Debian mirror (or None) - @param update_logger: UpdateLogger (expected to update after OTA) + @param update_logger: UpdateLogger (expected to update after OTA) + @param cancel_event: Event used to stop the downloading process """ def __init__(self, @@ -125,13 +129,15 @@ def __init__(self, proceed_without_rollback: bool, sota_repos: Optional[str], install_check_service: InstallCheckService, - update_logger: UpdateLogger) -> None: + update_logger: UpdateLogger, + cancel_event: threading.Event) -> None: super().__init__(repo_type, install_check_service) self._sota_repos = sota_repos self._proceed_without_rollback = proceed_without_rollback self._update_logger = update_logger self._dispatcher_broker = dispatcher_broker + self._cancel_event = cancel_event def create_parser(self) -> OtaParser: logger.debug(" ") @@ -145,7 +151,8 @@ def create_thread(self, parsed_manifest: Mapping[str, Optional[Any]]) -> OtaThre self._sota_repos, self._install_check_service, parsed_manifest, - self._update_logger) + self._update_logger, + self._cancel_event) class AotaFactory(OtaFactory): diff --git a/inbm/dispatcher-agent/dispatcher/ota_thread.py b/inbm/dispatcher-agent/dispatcher/ota_thread.py index ecd58b5aa..5083e295f 100644 --- a/inbm/dispatcher-agent/dispatcher/ota_thread.py +++ b/inbm/dispatcher-agent/dispatcher/ota_thread.py @@ -7,6 +7,7 @@ import abc import logging import os +import threading from threading import Lock from typing import Optional, Any, Mapping @@ -142,6 +143,7 @@ class SotaThread(OtaThread): @param install_check_service: provides install_check @param parsed_manifest: parameters from OTA manifest @param update_logger: UpdateLogger instance; expected to update when done with OTA + @param cancel_event: Event used to stop the downloading process @return (dict): dict representation of COMMAND_SUCCESS or OTA_FAILURE/OTA_FAILURE_IN_PROGRESS """ @@ -152,13 +154,15 @@ def __init__(self, sota_repos: Optional[str], install_check_service: InstallCheckService, parsed_manifest: Mapping[str, Optional[Any]], - update_logger: UpdateLogger) -> None: + update_logger: UpdateLogger, + cancel_event: threading.Event) -> None: super().__init__(repo_type, parsed_manifest, install_check_service=install_check_service) self._sota_repos = sota_repos self._proceed_without_rollback = proceed_without_rollback self._update_logger = update_logger self._dispatcher_broker = dispatcher_broker + self._cancel_event = cancel_event def start(self) -> Result: # pragma: no cover """Starts the SOTA thread and which checks for existing locks before delegating to @@ -177,6 +181,7 @@ def start(self) -> Result: # pragma: no cover dispatcher_broker=self._dispatcher_broker, update_logger=self._update_logger, sota_repos=self._sota_repos, + cancel_event=self._cancel_event, install_check_service=self._install_check_service) try: sota_instance.execute(self._proceed_without_rollback) @@ -200,6 +205,7 @@ def check(self) -> None: dispatcher_broker=self._dispatcher_broker, update_logger=self._update_logger, sota_repos=self._sota_repos, + cancel_event=self._cancel_event, install_check_service=self._install_check_service) sota_instance.check() except SotaError as e: diff --git a/inbm/dispatcher-agent/dispatcher/sota/cancel.py b/inbm/dispatcher-agent/dispatcher/sota/cancel.py new file mode 100644 index 000000000..628707e49 --- /dev/null +++ b/inbm/dispatcher-agent/dispatcher/sota/cancel.py @@ -0,0 +1,88 @@ +""" + Method to handle cancel request + + Copyright (C) 2017-2024 Intel Corporation + SPDX-License-Identifier: Apache-2.0 +""" + +import logging +from typing import Optional +from threading import Event +from inbm_lib.xmlhandler import XmlHandler +from threading import Thread +from .constants import SOTA_CACHE +from ..constants import OtaType +from dispatcher.dispatcher_exception import DispatcherException +from dispatcher.packagemanager.local_repo import DirectoryRepo +from dispatcher.dispatcher_broker import DispatcherBroker +from dispatcher.common.result_constants import Result, CODE_OK, CODE_BAD_REQUEST + +logger = logging.getLogger(__name__) + + +def cancel_thread(type_of_manifest: str, parsed_head: XmlHandler, thread_list: list[Thread], + type_of_active_manifest: Optional[str], active_thread_parsed_head: Optional[XmlHandler], + dispatcher_broker: DispatcherBroker, cancel_event: Event) -> bool: + """ + Cancel the current active thread by sending the terminate signal. + + @param type_of_manifest: type of the request + @param parsed_head: The root parsed xml + @param thread_list: List of the active thread + @param type_of_active_manifest: type of the request on running thread + @param active_thread_parsed_head: The root parsed xml of running thread + @param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services + @param cancel_event: Event used to stop the downloading process + @return: True if the request has been processed; False if no request has been handled. + """ + if type_of_manifest == 'ota': + header = parsed_head.get_children('ota/header') + ota_type = header['type'] + resource = parsed_head.get_children(f'ota/type/{ota_type}') + if ota_type == OtaType.SOTA.name.lower(): + sota_mode = resource.get('mode', None) + if sota_mode == 'cancel': + logger.debug(f"Receive sota cancel request.") + # If the active thread is not SOTA download-only, forbid the cancel request. + if type_of_active_manifest and active_thread_parsed_head: + if not is_active_ota_sota_download_only(type_of_active_manifest, active_thread_parsed_head): + dispatcher_broker.send_result( + str(Result(CODE_BAD_REQUEST, "Current thread is not SOTA download-only. " + "Cannot proceed with the cancel request."))) + return True + else: + dispatcher_broker.send_result(str(Result(CODE_BAD_REQUEST, "Running thread manifest not found."))) + return True + + # The list should only contain one OTA process. + for thread in thread_list: + if thread.is_alive(): + cancel_event.set() + # Wait thread to gracefully exit + logger.debug(f"Waiting thread to exit...") + thread.join(timeout=300) + logger.debug(f"Request cancel complete.") + # Reset the event flag + cancel_event.clear() + return True + return False + + +def is_active_ota_sota_download_only(type_of_active_manifest: str, active_parsed_head: XmlHandler) -> bool: + """ + Check whether the current active thread is SOTA download-only mode. + + @param type_of_active_manifest: type of the request + @param active_parsed_head: The root parsed xml + @return: True if it is SOTA download-only; False if not. + """ + logger.debug("") + if type_of_active_manifest == 'ota': + header = active_parsed_head.get_children('ota/header') + ota_type = header['type'] + resource = active_parsed_head.get_children(f'ota/type/{ota_type}') + if ota_type == OtaType.SOTA.name.lower(): + sota_mode = resource.get('mode', None) + if sota_mode == 'download-only': + return True + return False diff --git a/inbm/dispatcher-agent/dispatcher/sota/downloader.py b/inbm/dispatcher-agent/dispatcher/sota/downloader.py index aa6278447..069702464 100644 --- a/inbm/dispatcher-agent/dispatcher/sota/downloader.py +++ b/inbm/dispatcher-agent/dispatcher/sota/downloader.py @@ -7,6 +7,7 @@ from abc import abstractmethod import logging +import threading from datetime import datetime from typing import Optional @@ -35,7 +36,8 @@ def download(self, repo: IRepo, username: Optional[str], password: Optional[str], - release_date: Optional[str]) -> None: + release_date: Optional[str], + cancel_event: threading.Event) -> None: """Downloads update/upgrade and places capsule file in local cache. @param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services @@ -44,6 +46,7 @@ def download(self, @param username: username to use for download @param password: password to use for download @param release_date: manifest release date + @param cancel_event: Event used to stop the downloading process """ logger.debug("") @@ -84,7 +87,8 @@ def download(self, repo: IRepo, username: Optional[str], password: Optional[str], - release_date: Optional[str]) -> None: + release_date: Optional[str], + cancel_event: threading.Event) -> None: """downloads Debian-based update""" logger.debug("Debian-based OS does not require a file download to " @@ -109,7 +113,8 @@ def download(self, repo: IRepo, username: Optional[str], password: Optional[str], - release_date: Optional[str]) -> None: + release_date: Optional[str], + cancel_event: threading.Event) -> None: """STUB: downloads Windows update @param uri: URI of the source location @@ -118,6 +123,7 @@ def download(self, @param password: password to use for download @param release_date: manifest release date @raises SotaError: release date is not valid + @param cancel_event: Event used to stop the downloading process """ logger.debug("") @@ -138,7 +144,8 @@ def download(self, repo: IRepo, username: Optional[str], password: Optional[str], - release_date: Optional[str]) -> None: + release_date: Optional[str], + cancel_event: threading.Event) -> None: """Downloads files and places image in local cache @param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services @@ -148,6 +155,7 @@ def download(self, @param password: password to use for download @param release_date: manifest release date @raises SotaError: release date is not valid + @param cancel_event: Event used to stop the downloading process """ if not self.check_release_date(release_date): @@ -180,7 +188,8 @@ def download(self, repo: IRepo, username: Optional[str], password: Optional[str], - release_date: Optional[str]) -> None: + release_date: Optional[str], + cancel_event: threading.Event) -> None: """Downloads files and places image in local cache @param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services @@ -190,6 +199,7 @@ def download(self, @param password: password to use for download @param release_date: manifest release date @raises SotaError: release date is not valid + @param cancel_event: Event used to stop the downloading process """ if uri is None: @@ -202,7 +212,8 @@ def download(self, repo=repo, umask=UMASK_OTA, username=username, - token=password) + token=password, + cancel_event=cancel_event) def check_release_date(self, release_date: Optional[str]) -> bool: raise NotImplementedError() \ No newline at end of file diff --git a/inbm/dispatcher-agent/dispatcher/sota/sota.py b/inbm/dispatcher-agent/dispatcher/sota/sota.py index 501265aff..63b80e7de 100644 --- a/inbm/dispatcher-agent/dispatcher/sota/sota.py +++ b/inbm/dispatcher-agent/dispatcher/sota/sota.py @@ -7,6 +7,7 @@ import logging import os +import threading import time import threading from typing import Any, List, Optional, Union, Mapping @@ -84,6 +85,7 @@ def __init__(self, update_logger: UpdateLogger, sota_repos: Optional[str], install_check_service: InstallCheckService, + cancel_event: threading.Event, snapshot: Optional[Any] = None, action: Optional[Any] = None) -> None: """SOTA thread instance @@ -94,6 +96,7 @@ def __init__(self, @param sota_repos: new Ubuntu/Debian mirror (or None) @param update_logger: UpdateLogger instance--expected to notify it with update status @param kwargs: + @param cancel_event: Event used to stop the downloading process """ self._parsed_manifest = parsed_manifest @@ -115,6 +118,7 @@ def __init__(self, self._update_logger = update_logger self._dispatcher_broker = dispatcher_broker self._granular_log_handler = GranularLogHandler() + self._cancel_event = cancel_event try: manifest_package_list = parsed_manifest['package_list'] @@ -288,7 +292,8 @@ def execute(self, proceed_without_rollback: bool, skip_sleeps: bool = False) -> time_to_wait_before_reboot=time_to_wait_before_reboot, release_date=release_date) - def _download_sota_files(self, sota_cache_repo: IRepo, release_date: Optional[str]) -> None: + def _download_sota_files(self, sota_cache_repo: IRepo, release_date: Optional[str], + cancel_event: threading.Event,) -> None: """Download SOTA files from either a remote source or use a local source, and clean the cache directory. This method is responsible for downloading the necessary SOTA files from the specified remote source or @@ -296,6 +301,7 @@ def _download_sota_files(self, sota_cache_repo: IRepo, release_date: Optional[st @param sota_cache_repo: Repo object to store the downloaded files, and to delete all files from cache directory. @param release_date: The release date of the SOTA manifest, used for filtering downloads from the remote source. + @param cancel_event: Event used to stop the downloading process """ sota_cache_repo.delete_all() # clean cache directory @@ -307,12 +313,12 @@ def _download_sota_files(self, sota_cache_repo: IRepo, release_date: Optional[st if self._uri is None: downloader.download( self._dispatcher_broker, None, sota_cache_repo, - self._username, self._password, release_date) + self._username, self._password, release_date, cancel_event) else: downloader.download( self._dispatcher_broker, canonicalize_uri( self._uri), sota_cache_repo, - self._username, self._password, release_date) + self._username, self._password, release_date, cancel_event) def execute_from_manifest(self, setup_helper: SetupHelper, @@ -340,7 +346,7 @@ def execute_from_manifest(self, try: if setup_helper.pre_processing(): if self.sota_mode != 'no-download': - self._download_sota_files(sota_cache_repo, release_date) + self._download_sota_files(sota_cache_repo, release_date, self._cancel_event) download_success = True snapshotter.take_snapshot() cmd_list = self.calculate_and_execute_sota_upgrade(sota_cache_repo) @@ -355,6 +361,13 @@ def execute_from_manifest(self, if self.sota_mode != 'download-only': snapshotter.recover(rebooter, time_to_wait_before_reboot) except (DispatcherException, SotaError, UrlSecurityException, PermissionError) as e: + try: + # Remove the downloaded files inside the cache repo if error happens. + sota_cache_repo.delete_all() + except DispatcherException as err: + # DispatcherException may raise if the repo doesn't exist. + logger.debug(err) + msg = f"Caught exception during SOTA: {str(e)}" logger.debug(msg) self._dispatcher_broker.telemetry(str(e)) diff --git a/inbm/dispatcher-agent/dispatcher/sota/tiber_util.py b/inbm/dispatcher-agent/dispatcher/sota/tiber_util.py index 83327de7c..d10ca28aa 100644 --- a/inbm/dispatcher-agent/dispatcher/sota/tiber_util.py +++ b/inbm/dispatcher-agent/dispatcher/sota/tiber_util.py @@ -7,6 +7,7 @@ import logging import os import requests +import threading from requests import HTTPError from requests.exceptions import ProxyError, ChunkedEncodingError, ContentDecodingError, ConnectionError @@ -24,7 +25,8 @@ def tiber_download(dispatcher_broker: DispatcherBroker, uri: CanonicalUri, - repo: IRepo, username: Optional[str], token: str, umask: int) -> None: + repo: IRepo, username: Optional[str], token: str, umask: int, + cancel_event: threading.Event,) -> None: """Downloads files and places capsule file in path mentioned by manifest file. @param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services @@ -33,6 +35,7 @@ def tiber_download(dispatcher_broker: DispatcherBroker, uri: CanonicalUri, @param username: username to use for download @param token: token to use for download @param umask: file permission mask + @param cancel_event: Event used to stop the downloading process @raises SotaError: any exception """ dispatcher_broker.telemetry(f'Package to be fetched from {uri.value}') @@ -60,7 +63,7 @@ def tiber_download(dispatcher_broker: DispatcherBroker, uri: CanonicalUri, "Authorization": f"Bearer {token}" } - enough_space = is_enough_space_to_download(uri.value, repo, headers) + enough_space = is_enough_space_to_download(uri.value, repo, headers, cancel_event) if not enough_space: err_msg = " Insufficient free space available on " + shlex.quote(repo.get_repo_path()) + \ @@ -76,8 +79,14 @@ def tiber_download(dispatcher_broker: DispatcherBroker, uri: CanonicalUri, dispatcher_broker.telemetry(info_msg) try: - with requests.get(url=uri.value, headers=headers) as response: - repo.add(filename=file_name, contents=response.content) + with requests.get(url=uri.value, headers=headers, stream=True) as response: + response.raise_for_status() + with open(os.open(os.path.join(repo.get_repo_path(), file_name), os.O_CREAT | os.O_WRONLY), 'wb') \ + as destination_file: + for chunk in response.iter_content(chunk_size=16384): + if cancel_event.is_set(): + raise SotaError("Download cancelled.") + destination_file.write(chunk) except (HTTPError, OSError) as err: raise SotaError(f'OTA Fetch Failed: {err}') @@ -86,7 +95,8 @@ def tiber_download(dispatcher_broker: DispatcherBroker, uri: CanonicalUri, def is_enough_space_to_download(manifest_uri: str, destination_repo: IRepo, - headers: Any) -> bool: + headers: Any, + cancel_event: threading.Event) -> bool: """Checks if enough free space exists on platform to hold download. Calculates the file size from the OCI server and checks if required free space is available on @@ -94,11 +104,14 @@ def is_enough_space_to_download(manifest_uri: str, @param manifest_uri: registry manifest uri @param destination_repo: desired download destination @param headers: headers that contains jwt_token to access the release server + @param cancel_event: Event used to stop the downloading process + + @return: True if space is enough; Otherwise False. """ try: logger.debug(f"Checking file size with manifest uri: {manifest_uri}") - with requests.get(url=manifest_uri, headers=headers) as response: + with requests.get(url=manifest_uri, headers=headers, stream=True) as response: response.raise_for_status() # Read Content-Length header try: @@ -111,6 +124,8 @@ def is_enough_space_to_download(manifest_uri: str, for chunk in response.iter_content(chunk_size=16384): if chunk: content_length += len(chunk) + if cancel_event.is_set(): + raise SotaError("Download cancelled.") except HTTPError as e: if e.response: diff --git a/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd b/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd index d787263a8..3f6f579e3 100644 --- a/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd +++ b/inbm/dispatcher-agent/fpm-template/usr/share/dispatcher-agent/manifest_schema.xsd @@ -465,6 +465,7 @@ + diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_cancel.py b/inbm/dispatcher-agent/tests/unit/sota/test_cancel.py new file mode 100644 index 000000000..d44edd1ea --- /dev/null +++ b/inbm/dispatcher-agent/tests/unit/sota/test_cancel.py @@ -0,0 +1,146 @@ +import unittest +from unittest.mock import Mock +import os +import threading +from time import sleep +from inbm_lib.xmlhandler import XmlHandler + +from dispatcher.sota.cancel import cancel_thread, is_active_ota_sota_download_only +from dispatcher.common.result_constants import Result, CODE_OK, CODE_BAD_REQUEST + +TEST_SCHEMA_LOCATION = os.path.join(os.path.dirname(__file__), + '../../../fpm-template/usr/share/dispatcher-agent/' + 'manifest_schema.xsd') + + +class TestCancelThread(unittest.TestCase): + + def test_cancel_thread_success(self) -> None: + def mock_thread(cancel_event: threading.Event): + while not cancel_event.is_set(): + sleep(3) + + sota_cancel_manifest = """ota
+ sotaremote
update + cancelyes +
""" + + cancel_event = threading.Event() + + type_of_manifest = "ota" + thread_list = [] + worker = threading.Thread(target=mock_thread, args=(cancel_event,)) + worker.setDaemon(True) + thread_list.append(worker) + worker.start() + sleep(1) + parsed_head = XmlHandler(sota_cancel_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + + sota_download_only_manifest = """ota
+ sotaremote
update + download-onlyyes +
""" + active_parsed_head = XmlHandler(sota_download_only_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + + self.assertTrue(cancel_thread(type_of_manifest=type_of_manifest, + parsed_head=parsed_head, + thread_list=thread_list, + type_of_active_manifest=type_of_manifest, + active_thread_parsed_head=active_parsed_head, + dispatcher_broker=Mock(), + cancel_event=cancel_event)) + + def test_cancel_thread_with_thread_running_with_no_download_mode(self) -> None: + def mock_thread(cancel_event: threading.Event): + while not cancel_event.is_set(): + sleep(3) + + sota_cancel_manifest = """ota
+ sotaremote
update + cancelyes +
""" + + cancel_event = threading.Event() + + type_of_manifest = "ota" + thread_list = [] + worker = threading.Thread(target=mock_thread, args=(cancel_event,)) + worker.setDaemon(True) + thread_list.append(worker) + worker.start() + sleep(1) + parsed_head = XmlHandler(sota_cancel_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + + sota_no_download_manifest = """ota
+ sotaremote
update + no-downloadyes +
""" + active_parsed_head = XmlHandler(sota_no_download_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + dispatcher_broker = Mock() + self.assertTrue(cancel_thread(type_of_manifest=type_of_manifest, + parsed_head=parsed_head, + thread_list=thread_list, + type_of_active_manifest=type_of_manifest, + active_thread_parsed_head=active_parsed_head, + dispatcher_broker=dispatcher_broker, + cancel_event=cancel_event)) + + dispatcher_broker.send_result.assert_called_once_with(str(Result(CODE_BAD_REQUEST, + "Current thread is not SOTA download-only. " + "Cannot proceed with the cancel request."))) + + def test_cancel_thread_without_running_thread_manifest(self) -> None: + def mock_thread(cancel_event: threading.Event): + while not cancel_event.is_set(): + sleep(3) + + sota_cancel_manifest = """ota
+ sotaremote
update + cancelyes +
""" + + cancel_event = threading.Event() + + type_of_manifest = "ota" + thread_list = [] + worker = threading.Thread(target=mock_thread, args=(cancel_event,)) + worker.setDaemon(True) + thread_list.append(worker) + worker.start() + sleep(1) + parsed_head = XmlHandler(sota_cancel_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + + dispatcher_broker = Mock() + self.assertTrue(cancel_thread(type_of_manifest=type_of_manifest, + parsed_head=parsed_head, + thread_list=thread_list, + type_of_active_manifest=type_of_manifest, + active_thread_parsed_head=None, + dispatcher_broker=dispatcher_broker, + cancel_event=cancel_event)) + + dispatcher_broker.send_result.assert_called_once_with(str(Result(CODE_BAD_REQUEST, "Running thread manifest not found."))) + + def test_is_active_ota_sota_download_only_return_true(self) -> None: + + sota_download_only_manifest = """ota
+ sotaremote
update + download-onlyyes +
""" + + type_of_manifest = "ota" + parsed_head = XmlHandler(sota_download_only_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + + self.assertTrue(is_active_ota_sota_download_only(type_of_manifest, parsed_head)) + + def test_is_active_ota_sota_download_only_return_false(self) -> None: + + sota_download_only_manifest = """ota
+ sotaremote
update + no-downloadyes +
""" + + type_of_manifest = "ota" + parsed_head = XmlHandler(sota_download_only_manifest, is_file=False, schema_location=TEST_SCHEMA_LOCATION) + + self.assertFalse(is_active_ota_sota_download_only(type_of_manifest, parsed_head)) diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_downloader.py b/inbm/dispatcher-agent/tests/unit/sota/test_downloader.py index a5c4075b8..b8acf7cfd 100644 --- a/inbm/dispatcher-agent/tests/unit/sota/test_downloader.py +++ b/inbm/dispatcher-agent/tests/unit/sota/test_downloader.py @@ -3,6 +3,7 @@ import shutil from typing import Optional import os +import threading from ..common.mock_resources import * @@ -45,6 +46,7 @@ def setUp(cls) -> None: MockDispatcherBroker.build_mock_dispatcher_broker(), UpdateLogger("SOTA", "metadata"), None, + cancel_event=threading.Event(), install_check_service=MockInstallCheckService()) cls.sota_instance.factory = SotaOsFactory( MockDispatcherBroker.build_mock_dispatcher_broker(), None, []).get_os('YoctoX86_64') @@ -64,7 +66,7 @@ def test_download_successful(self, mock_download, mock_date) -> None: try: installer.download(self.mock_disp_broker, mock_url, TestDownloader._build_mock_repo(0), - self.username, self.password, self.release_date) + self.username, self.password, self.release_date, threading.Event()) except (SotaError, DispatcherException): self.fail("raised Error unexpectedly!") @@ -85,7 +87,7 @@ def test_download_raises(self, mock_download, mock_date) -> None: try: installer.download(self.mock_disp_broker, mock_url, TestDownloader._build_mock_repo(0), - self.username, self.password, self.release_date) + self.username, self.password, self.release_date, threading.Event()) except DispatcherException as e: self.assertRaises(DispatcherException) self.assertEqual(str(e), "foo") @@ -105,7 +107,7 @@ def test_return_false_when_is_valid_release_date_fails(self) -> None: installer.download(self.mock_disp_broker, mock_url, TestDownloader._build_mock_repo( 0), - self.username, self.password, self.release_date) + self.username, self.password, self.release_date, threading.Event()) except SotaError as e: self.assertEqual(str(e), 'Missing manifest Release date field') @@ -139,7 +141,7 @@ def test_tiberos_download_successful(self, mock_download, mock_read_token) -> No try: installer.download(self.mock_disp_broker, mock_url, repo, - self.username, password, self.release_date) + self.username, password, self.release_date, threading.Event()) except (SotaError, DispatcherException): self.fail("raised Error unexpectedly!") finally: @@ -166,6 +168,6 @@ def test_tiberos_download_with_empty_uri(self) -> None: with self.assertRaises(SotaError): installer.download(self.mock_disp_broker, None, repo, - self.username, password, self.release_date) + self.username, password, self.release_date, threading.Event()) finally: shutil.rmtree(directory) diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py b/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py index a2f963277..852117589 100644 --- a/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py +++ b/inbm/dispatcher-agent/tests/unit/sota/test_os_updater_nose_style.py @@ -1,6 +1,7 @@ import unittest from typing import Optional import os +import threading from ..common.mock_resources import * from dispatcher.sota.os_factory import SotaOsFactory @@ -45,6 +46,7 @@ def setUpClass(cls) -> None: cls.mock_disp_obj._update_logger, None, MockInstallCheckService(), + cancel_event=threading.Event(), snapshot=1) parsed_manifest_packages = {'resource': cls.resource, @@ -57,6 +59,7 @@ def setUpClass(cls) -> None: cls.mock_disp_obj._update_logger, None, MockInstallCheckService(), + cancel_event=threading.Event(), snapshot=1) def test_create_no_download_cmd_with_no_package_list(self) -> None: diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_sota.py b/inbm/dispatcher-agent/tests/unit/sota/test_sota.py index d971a0f8a..fe7743805 100644 --- a/inbm/dispatcher-agent/tests/unit/sota/test_sota.py +++ b/inbm/dispatcher-agent/tests/unit/sota/test_sota.py @@ -1,3 +1,5 @@ +import threading + import testtools import os import tempfile @@ -46,12 +48,14 @@ def setUpClass(cls) -> None: UpdateLogger("SOTA", "metadata"), None, install_check_service=MockInstallCheckService(), + cancel_event=threading.Event(), snapshot=1) cls.sota_local_instance = SOTA(parsed_manifest, 'local', cls.mock_disp_broker, UpdateLogger("SOTA", "metadata"), None, install_check_service=MockInstallCheckService(), + cancel_event=threading.Event(), snapshot=1) cls.sota_util_instance = SOTAUtil() @@ -126,7 +130,8 @@ def test_run_raises(self, mock_reboot, mock_rollback_and_delete_snap, mock_print sota_instance = SOTA(parsed_manifest, 'remote', mock_disp_broker, UpdateLogger("SOTA", "metadata"), None, - MockInstallCheckService(), snapshot=1) + MockInstallCheckService(), cancel_event=threading.Event(), + snapshot=1) sota_instance.execute(proceed_without_rollback=False, skip_sleeps=True) mock_print.assert_called_once() if TestSota.sota_instance.proceed_without_rollback: @@ -151,7 +156,8 @@ def test_run_pass(self, mock_run, mock_rollback_and_delete_snap, mock_print, mock_disp_broker = MockDispatcherBroker.build_mock_dispatcher_broker() try: sota_instance = SOTA(parsed_manifest, 'remote', mock_disp_broker, - UpdateLogger("SOTA", "metadata"), None, MockInstallCheckService(), snapshot=1) + UpdateLogger("SOTA", "metadata"), None, MockInstallCheckService(), + cancel_event=threading.Event(),snapshot=1) sota_instance.execute(proceed_without_rollback=False, skip_sleeps=True) mock_print.assert_called_once() if TestSota.sota_instance.proceed_without_rollback: diff --git a/inbm/dispatcher-agent/tests/unit/sota/test_tiber_util.py b/inbm/dispatcher-agent/tests/unit/sota/test_tiber_util.py index d157048aa..ad316d658 100644 --- a/inbm/dispatcher-agent/tests/unit/sota/test_tiber_util.py +++ b/inbm/dispatcher-agent/tests/unit/sota/test_tiber_util.py @@ -1,7 +1,10 @@ +import threading import unittest import tempfile import os import shutil +from requests import HTTPError +from requests.exceptions import ProxyError from ..common.mock_resources import * from inbm_common_lib.utility import canonicalize_uri @@ -73,7 +76,8 @@ def setUp(cls) -> None: MockDispatcherBroker.build_mock_dispatcher_broker(), UpdateLogger("SOTA", "metadata"), None, - install_check_service=MockInstallCheckService()) + install_check_service=MockInstallCheckService(), + cancel_event=threading.Event()) cls.sota_instance.factory = SotaOsFactory( MockDispatcherBroker.build_mock_dispatcher_broker(), None, []).get_os('tiber') @@ -82,7 +86,8 @@ def setUp(cls) -> None: @patch('dispatcher.sota.tiber_util.verify_source') def test_download_successful(self, mock_verify_source, mock_read_token, mock_get) -> None: self.release_date = self.username = self.password = None - mock_url = canonicalize_uri("https://registry-rs.internal.ledgepark.intel.com/one-intel-edge/tiberos:latest") + self.cancel_event = threading.Event() + mock_url = canonicalize_uri(" https://files-rs.internal.ledgepark.intel.com/repository/pool/TiberOS/TiberOS-RT/core-rt-1.0.20241001.2251.raw.xz") mock_response = MagicMock() mock_response.status_code = 200 mock_get.return_value = mock_response @@ -94,17 +99,120 @@ def test_download_successful(self, mock_verify_source, mock_read_token, mock_get assert factory installer = factory.create_downloader() assert installer + + directory = tempfile.mkdtemp() + repo = DirectoryRepo(directory) try: installer.download(self.mock_disp_broker, - mock_url, TestDownloader._build_mock_repo(0), - self.username, self.password, self.release_date) + mock_url, repo, + self.username, self.password, self.release_date, self.cancel_event) except (SotaError, DispatcherException): self.fail("raised Error unexpectedly!") + finally: + shutil.rmtree(directory) mock_verify_source.assert_called_once() mock_read_token.assert_called_once() assert mock_get.call_count == 2 + @patch('requests.get') + @patch('dispatcher.sota.downloader.read_release_server_token', return_value="mock_password") + @patch('dispatcher.sota.tiber_util.verify_source') + def test_download_failed_with_HTTPError(self, mock_verify_source, mock_read_token, mock_get) -> None: + self.release_date = self.username = self.password = None + self.cancel_event = threading.Event() + self.cancel_event.set() + mock_url = canonicalize_uri(" http://files-rs.internal.ledgepark.intel.com/repository/pool/TiberOS/TiberOS-RT/core-rt-1.0.20241001.2251.raw.xz") + mock_response = MagicMock() + mock_response.status_code = 400 + mock_get.side_effect = HTTPError + + assert TestDownloader.sota_instance + TestDownloader.sota_instance.factory = SotaOsFactory( + MockDispatcherBroker.build_mock_dispatcher_broker(), None, []).get_os('tiber') + factory = TestDownloader.sota_instance.factory + assert factory + installer = factory.create_downloader() + assert installer + + directory = tempfile.mkdtemp() + repo = DirectoryRepo(directory) + try: + with self.assertRaises(SotaError): + installer.download(self.mock_disp_broker, + mock_url, repo, + self.username, self.password, self.release_date, self.cancel_event) + finally: + shutil.rmtree(directory) + + mock_verify_source.assert_called_once() + mock_read_token.assert_called_once() + assert mock_get.call_count == 1 + + @patch('requests.get') + @patch('dispatcher.sota.downloader.read_release_server_token', return_value="mock_password") + @patch('dispatcher.sota.tiber_util.verify_source') + def test_download_failed_with_ProxyError(self, mock_verify_source, mock_read_token, mock_get) -> None: + self.release_date = self.username = self.password = None + self.cancel_event = threading.Event() + self.cancel_event.set() + mock_url = canonicalize_uri(" http://files-rs.internal.ledgepark.intel.com/repository/pool/TiberOS/TiberOS-RT/core-rt-1.0.20241001.2251.raw.xz") + mock_response = MagicMock() + mock_response.status_code = 400 + mock_get.side_effect = ProxyError + + assert TestDownloader.sota_instance + TestDownloader.sota_instance.factory = SotaOsFactory( + MockDispatcherBroker.build_mock_dispatcher_broker(), None, []).get_os('tiber') + factory = TestDownloader.sota_instance.factory + assert factory + installer = factory.create_downloader() + assert installer + + directory = tempfile.mkdtemp() + repo = DirectoryRepo(directory) + try: + with self.assertRaises(SotaError): + installer.download(self.mock_disp_broker, + mock_url, repo, + self.username, self.password, self.release_date, self.cancel_event) + finally: + shutil.rmtree(directory) + + mock_verify_source.assert_called_once() + mock_read_token.assert_called_once() + assert mock_get.call_count == 1 + + @patch('requests.get') + @patch('dispatcher.sota.downloader.read_release_server_token', return_value="mock_password") + @patch('dispatcher.sota.tiber_util.verify_source') + def test_download_failed_with_non_exist_repo(self, mock_verify_source, mock_read_token, mock_get) -> None: + self.release_date = self.username = self.password = None + self.cancel_event = threading.Event() + self.cancel_event.set() + mock_url = canonicalize_uri( + " http://files-rs.internal.ledgepark.intel.com/repository/pool/TiberOS/TiberOS-RT/core-rt-1.0.20241001.2251.raw.xz") + mock_response = MagicMock() + mock_response.status_code = 400 + + assert TestDownloader.sota_instance + TestDownloader.sota_instance.factory = SotaOsFactory( + MockDispatcherBroker.build_mock_dispatcher_broker(), None, []).get_os('tiber') + factory = TestDownloader.sota_instance.factory + assert factory + installer = factory.create_downloader() + assert installer + + repo = DirectoryRepo(CACHE) + with self.assertRaises(SotaError): + installer.download(self.mock_disp_broker, + mock_url, repo, + self.username, self.password, self.release_date, self.cancel_event) + + mock_verify_source.assert_called_once() + mock_read_token.assert_called_once() + assert mock_get.call_count == 1 + @staticmethod def _build_mock_repo(num_files=0): mem_repo = MemoryRepo(CACHE) diff --git a/inbm/dispatcher-agent/tests/unit/test_ota_factory.py b/inbm/dispatcher-agent/tests/unit/test_ota_factory.py index a25b4e3bc..22af86a69 100644 --- a/inbm/dispatcher-agent/tests/unit/test_ota_factory.py +++ b/inbm/dispatcher-agent/tests/unit/test_ota_factory.py @@ -1,3 +1,5 @@ +import threading + import pytest from unit.common.mock_resources import * @@ -28,7 +30,8 @@ def test_get_factory(ota_type, expected_factory, mock_disp_obj, mock_disp_broker None, MockInstallCheckService(), UpdateLogger(ota_type=ota_type, data="metadata"), - ConfigDbs.ON + ConfigDbs.ON, + cancel_event=threading.Event() ) assert isinstance(factory, expected_factory) @@ -43,7 +46,8 @@ def test_raise_error_unsupported_ota(mock_disp_obj, mock_disp_broker) -> None: None, MockInstallCheckService(), UpdateLogger(ota_type="IOTA", data="metadata"), - ConfigDbs.OFF + ConfigDbs.OFF, + cancel_event=threading.Event() ) @@ -61,7 +65,8 @@ def test_create_parser(ota_type, expected_parser, mock_disp_obj, mock_disp_broke None, MockInstallCheckService(), UpdateLogger(ota_type=ota_type, data="metadata"), - ConfigDbs.ON + ConfigDbs.ON, + cancel_event=threading.Event() ).create_parser() assert isinstance(parser, expected_parser) @@ -80,6 +85,7 @@ def test_create_thread(ota_type, expected_thread, mock_disp_obj, mock_disp_broke None, MockInstallCheckService(), UpdateLogger(ota_type=ota_type, data="metadata"), - ConfigDbs.ON + ConfigDbs.ON, + cancel_event=threading.Event() ).create_thread({'abc': 'def'}) assert isinstance(thread, expected_thread)