Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[NEXMANAGE-737] Sota cancel mode with threading event method #567

Merged
merged 7 commits into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion inbc-program/inbc/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def parse_sota_args(self) -> None:
parser_sota.add_argument('--reboot', '-rb', default='yes', required=False, choices=['yes', 'no'],
help='Type of information [ yes | no ]')
parser_sota.add_argument('--mode', '-m', default='full',
required=False, choices=['full', 'download-only', 'no-download'])
required=False, choices=['full', 'download-only', 'no-download', 'cancel'])
parser_sota.add_argument('--package-list', '-p', required=False,
type=lambda x: validate_package_list(x),
help='Comma-separated list of package names to install')
Expand Down
3 changes: 3 additions & 0 deletions inbm/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## NEXT - MMMM-DD-YY
### Added
- (NEXMANAGE-737) Enable sota cancel mode

### Fixed
- (NEXMANAGE-872) Fix provision-tc issue in TiberOS - cannot overwrite /etc/dispatcher.environment

Expand Down
48 changes: 44 additions & 4 deletions inbm/dispatcher-agent/dispatcher/dispatcher_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import signal
import sys
from queue import Queue
from threading import Thread, active_count, Lock
from threading import Thread, active_count, Lock, Event
from time import sleep
from typing import Optional, Any, Mapping, Tuple, Sequence

Expand Down Expand Up @@ -55,6 +55,7 @@
from .sota.os_factory import SotaOsFactory
from .sota.sota import SOTA
from .sota.sota_error import SotaError
from .sota.cancel import cancel_thread
from .workload_orchestration import WorkloadOrchestration
from inbm_lib.xmlhandler import *
from inbm_lib.version import get_friendly_inbm_version_commit
Expand Down Expand Up @@ -94,6 +95,8 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv
# Initialize update_queue with a capacity of 1 to ensure serialized handling of updates.
self.update_queue: Queue[Tuple[str, str, Optional[str]]] = Queue(1)
self._thread_count = 1
self._thread_list: list[Thread] = []
self._active_thread_manifest: Optional[str] = None
self._sota_repos = None
self.sota_mode = None
self._package_list: str = ""
Expand All @@ -115,6 +118,7 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv

self.sqlite_mgr = SqliteManager()
self.ap_scheduler = APScheduler(sqlite_mgr=self.sqlite_mgr)
self._cancel_event = Event()

def stop(self) -> None:
self.RUNNING = False
Expand Down Expand Up @@ -186,7 +190,13 @@ def _sig_handler(signo, frame) -> None:
if active_count() - active_start_count < self._thread_count:
worker = Thread(target=handle_updates, args=(self,))
worker.setDaemon(True)
self._thread_list.append(worker)
worker.start()

# Periodically check if processes have finished. If process finished, remove it from the list.
for thread in self._thread_list[:]:
if not thread.is_alive():
self._thread_list.remove(thread)
yengliong93 marked this conversation as resolved.
Show resolved Hide resolved
sleep(1)

self._dispatcher_broker.mqtt_publish(f'{AGENT}/state', 'dead', retain=True)
Expand Down Expand Up @@ -310,6 +320,8 @@ def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: st
result: Result = Result()
logger.debug("do_install")
parsed_head = None
# Assumption is that there is only one active OTA thread at a time
self._active_thread_manifest = xml
gblewis1 marked this conversation as resolved.
Show resolved Hide resolved
try: # TODO: Split into multiple try/except blocks
type_of_manifest, parsed_head = \
_check_type_validate_manifest(xml, schema_location=schema_location)
Expand Down Expand Up @@ -411,7 +423,8 @@ def _do_ota_update(self, ota_type: str, repo_type: str, resource: dict,
self._sota_repos,
self._install_check_service,
self._update_logger,
self.config_dbs)
self.config_dbs,
self._cancel_event)

p = factory.create_parser()
# NOTE: p.parse can raise one of the *otaError exceptions
Expand Down Expand Up @@ -440,7 +453,8 @@ def _validate_pota_manifest(self, repo_type: str,
self._sota_repos,
self._install_check_service,
self._update_logger,
self.config_dbs)
self.config_dbs,
self._cancel_event)
p = factory.create_parser()
# NOTE: p.parse can raise one of the *otaError exceptions
parsed_manifest = p.parse(ota_list[ota], kwargs, parsed_head)
Expand Down Expand Up @@ -497,7 +511,32 @@ def _on_cloud_request(self, topic: str, payload: str, qos: int) -> None:
request_type = topic.split('/')[2]
request_id = topic.split('/')[3] if len(topic.split('/')) > 3 else None
manifest = payload
self._add_request_to_queue(request_type, manifest, request_id)
if not self._handle_cancel_request(request_type, manifest):
self._add_request_to_queue(request_type, manifest, request_id)

def _handle_cancel_request(self, request_type: str, manifest: str) -> bool:
"""
Check if it is a SOTA cancel request. If it is, send the terminate signal to current process.

@param request_type: type of the request
@param manifest: manifest to be processed
@return: True if the request has been processed; False if no request has been handled.
"""
if request_type == "install":
type_of_manifest, parsed_head = \
_check_type_validate_manifest(manifest)
type_of_active_manifest = active_thread_parsed_head = None
if self._active_thread_manifest:
type_of_active_manifest, active_thread_parsed_head = \
_check_type_validate_manifest(self._active_thread_manifest)
result = cancel_thread(type_of_manifest, parsed_head, self._thread_list,
type_of_active_manifest, active_thread_parsed_head,
self._dispatcher_broker, self._cancel_event)
if result:
logger.debug(f"Request cancel complete.")
self._send_result(str(Result(CODE_OK, "Request complete.")))
return True
return False

def _on_message(self, topic: str, payload: Any, qos: int) -> None:
"""Called when a message is received from _telemetry-agent
Expand Down Expand Up @@ -619,6 +658,7 @@ def invoke_sota(self, snapshot: Optional[Any] = None, action: Optional[Any] = No
self._update_logger,
self._sota_repos,
self._install_check_service,
self._cancel_event,
snapshot, action)

sota_instance.execute(self.proceed_without_rollback)
Expand Down
17 changes: 12 additions & 5 deletions inbm/dispatcher-agent/dispatcher/ota_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import abc
import logging
import threading
from typing import Any, Optional, Mapping

from .config_dbs import ConfigDbs
Expand Down Expand Up @@ -53,7 +54,8 @@ def get_factory(ota_type,
sota_repos: Optional[str],
install_check_service: InstallCheckService,
update_logger: UpdateLogger,
dbs: ConfigDbs) -> "OtaFactory":
dbs: ConfigDbs,
cancel_event: threading.Event) -> "OtaFactory":
"""Create an OTA factory of a specified OTA type

@param ota_type: The OTA type
Expand All @@ -64,6 +66,7 @@ def get_factory(ota_type,
@param install_check_service: provides install_check
@param update_logger: UpdateLogger (expected to update after OTA)
@param dbs: ConfigDbs.ON or ConfigDbs.WARN or ConfigDbs.OFF
@param cancel_event: Event used to stop the downloading process
@raise ValueError: Unsupported OTA type
"""

Expand All @@ -72,7 +75,7 @@ def get_factory(ota_type,
return FotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger)
if ota_type == OtaType.SOTA.name:
return SotaFactory(repo_type, dispatcher_broker, proceed_without_rollback,
sota_repos, install_check_service, update_logger)
sota_repos, install_check_service, update_logger, cancel_event)
if ota_type == OtaType.AOTA.name:
return AotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger, dbs=dbs)
if ota_type == OtaType.POTA.name:
Expand Down Expand Up @@ -116,7 +119,8 @@ class SotaFactory(OtaFactory):
@param proceed_without_rollback: Is it OK to run SOTA without rollback ability?
@param install_check_service: provides InstallCheckService
@param sota_repos: new Ubuntu/Debian mirror (or None)
@param update_logger: UpdateLogger (expected to update after OTA)
@param update_logger: UpdateLogger (expected to update after OTA)
@param cancel_event: Event used to stop the downloading process
"""

def __init__(self,
Expand All @@ -125,13 +129,15 @@ def __init__(self,
proceed_without_rollback: bool,
sota_repos: Optional[str],
install_check_service: InstallCheckService,
update_logger: UpdateLogger) -> None:
update_logger: UpdateLogger,
cancel_event: threading.Event) -> None:

super().__init__(repo_type, install_check_service)
self._sota_repos = sota_repos
self._proceed_without_rollback = proceed_without_rollback
self._update_logger = update_logger
self._dispatcher_broker = dispatcher_broker
self._cancel_event = cancel_event

def create_parser(self) -> OtaParser:
logger.debug(" ")
Expand All @@ -145,7 +151,8 @@ def create_thread(self, parsed_manifest: Mapping[str, Optional[Any]]) -> OtaThre
self._sota_repos,
self._install_check_service,
parsed_manifest,
self._update_logger)
self._update_logger,
self._cancel_event)


class AotaFactory(OtaFactory):
Expand Down
8 changes: 7 additions & 1 deletion inbm/dispatcher-agent/dispatcher/ota_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import abc
import logging
import os
import threading
from threading import Lock
from typing import Optional, Any, Mapping

Expand Down Expand Up @@ -142,6 +143,7 @@ class SotaThread(OtaThread):
@param install_check_service: provides install_check
@param parsed_manifest: parameters from OTA manifest
@param update_logger: UpdateLogger instance; expected to update when done with OTA
@param cancel_event: Event used to stop the downloading process
@return (dict): dict representation of COMMAND_SUCCESS or OTA_FAILURE/OTA_FAILURE_IN_PROGRESS
"""

Expand All @@ -152,13 +154,15 @@ def __init__(self,
sota_repos: Optional[str],
install_check_service: InstallCheckService,
parsed_manifest: Mapping[str, Optional[Any]],
update_logger: UpdateLogger) -> None:
update_logger: UpdateLogger,
cancel_event: threading.Event) -> None:
super().__init__(repo_type, parsed_manifest,
install_check_service=install_check_service)
self._sota_repos = sota_repos
self._proceed_without_rollback = proceed_without_rollback
self._update_logger = update_logger
self._dispatcher_broker = dispatcher_broker
self._cancel_event = cancel_event

def start(self) -> Result: # pragma: no cover
"""Starts the SOTA thread and which checks for existing locks before delegating to
Expand All @@ -177,6 +181,7 @@ def start(self) -> Result: # pragma: no cover
dispatcher_broker=self._dispatcher_broker,
update_logger=self._update_logger,
sota_repos=self._sota_repos,
cancel_event=self._cancel_event,
install_check_service=self._install_check_service)
try:
sota_instance.execute(self._proceed_without_rollback)
Expand All @@ -200,6 +205,7 @@ def check(self) -> None:
dispatcher_broker=self._dispatcher_broker,
update_logger=self._update_logger,
sota_repos=self._sota_repos,
cancel_event=self._cancel_event,
install_check_service=self._install_check_service)
sota_instance.check()
except SotaError as e:
Expand Down
88 changes: 88 additions & 0 deletions inbm/dispatcher-agent/dispatcher/sota/cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
Method to handle cancel request

Copyright (C) 2017-2024 Intel Corporation
SPDX-License-Identifier: Apache-2.0
"""

import logging
from typing import Optional
from threading import Event
from inbm_lib.xmlhandler import XmlHandler
from threading import Thread
from .constants import SOTA_CACHE
from ..constants import OtaType
from dispatcher.dispatcher_exception import DispatcherException
from dispatcher.packagemanager.local_repo import DirectoryRepo
from dispatcher.dispatcher_broker import DispatcherBroker
from dispatcher.common.result_constants import Result, CODE_OK, CODE_BAD_REQUEST

logger = logging.getLogger(__name__)


def cancel_thread(type_of_manifest: str, parsed_head: XmlHandler, thread_list: list[Thread],
type_of_active_manifest: Optional[str], active_thread_parsed_head: Optional[XmlHandler],
dispatcher_broker: DispatcherBroker, cancel_event: Event) -> bool:
"""
Cancel the current active thread by sending the terminate signal.

@param type_of_manifest: type of the request
@param parsed_head: The root parsed xml
@param thread_list: List of the active thread
@param type_of_active_manifest: type of the request on running thread
@param active_thread_parsed_head: The root parsed xml of running thread
@param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services
@param cancel_event: Event used to stop the downloading process
@return: True if the request has been processed; False if no request has been handled.
"""
if type_of_manifest == 'ota':
header = parsed_head.get_children('ota/header')
ota_type = header['type']
resource = parsed_head.get_children(f'ota/type/{ota_type}')
if ota_type == OtaType.SOTA.name.lower():
sota_mode = resource.get('mode', None)
if sota_mode == 'cancel':
logger.debug(f"Receive sota cancel request.")
# If the active thread is not SOTA download-only, forbid the cancel request.
if type_of_active_manifest and active_thread_parsed_head:
if not is_active_ota_sota_download_only(type_of_active_manifest, active_thread_parsed_head):
dispatcher_broker.send_result(
str(Result(CODE_BAD_REQUEST, "Current thread is not SOTA download-only. "
gblewis1 marked this conversation as resolved.
Show resolved Hide resolved
"Cannot proceed with the cancel request.")))
return True
gblewis1 marked this conversation as resolved.
Show resolved Hide resolved
else:
dispatcher_broker.send_result(str(Result(CODE_BAD_REQUEST, "Running thread manifest not found.")))
return True

# The list should only contain one OTA process.
for thread in thread_list:
if thread.is_alive():
cancel_event.set()
yengliong93 marked this conversation as resolved.
Show resolved Hide resolved
# Wait thread to gracefully exit
logger.debug(f"Waiting thread to exit...")
thread.join()
yengliong93 marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(f"Request cancel complete.")
# Reset the event flag
cancel_event.clear()
return True
return False


def is_active_ota_sota_download_only(type_of_active_manifest: str, active_parsed_head: XmlHandler) -> bool:
"""
Check whether the current active thread is SOTA download-only mode.

@param type_of_active_manifest: type of the request
@param active_parsed_head: The root parsed xml
@return: True if it is SOTA download-only; False if not.
"""
logger.debug("")
if type_of_active_manifest == 'ota':
header = active_parsed_head.get_children('ota/header')
ota_type = header['type']
resource = active_parsed_head.get_children(f'ota/type/{ota_type}')
if ota_type == OtaType.SOTA.name.lower():
sota_mode = resource.get('mode', None)
if sota_mode == 'download-only':
return True
return False
Loading