Skip to content

Commit

Permalink
[NEXMANAGE-737] Sota cancel mode with threading event method (#567)
Browse files Browse the repository at this point in the history
* [NEXMANAGE-737] Enable sota cancel mode.

This PR implements the sota cancel mode using a threading event flag. When the thread is created, it will be added to a thread list.
When the dispatcher receives the sota cancel request, it checks the current running thread and retrieves its sota type. If it is a download-only sota, the dispatcher sets the event flag to issue a cancel request.
Please note that the first inbc command should be terminated before sending inbc cancel command as they will be sharing the same mqtt connection.

Signed-off-by: yengliong <[email protected]>

* Cancel request in the middle of download

* Fix unit test

* Add unit tests to increase coverage

* Add timeout in thread.join

---------

Signed-off-by: yengliong <[email protected]>
  • Loading branch information
yengliong93 authored Oct 18, 2024
1 parent 4bfbbd0 commit 5426e6c
Show file tree
Hide file tree
Showing 16 changed files with 497 additions and 42 deletions.
2 changes: 1 addition & 1 deletion inbc-program/inbc/parser/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def parse_sota_args(self) -> None:
parser_sota.add_argument('--reboot', '-rb', default='yes', required=False, choices=['yes', 'no'],
help='Type of information [ yes | no ]')
parser_sota.add_argument('--mode', '-m', default='full',
required=False, choices=['full', 'download-only', 'no-download'])
required=False, choices=['full', 'download-only', 'no-download', 'cancel'])
parser_sota.add_argument('--package-list', '-p', required=False,
type=lambda x: validate_package_list(x),
help='Comma-separated list of package names to install')
Expand Down
3 changes: 3 additions & 0 deletions inbm/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/).

## NEXT - MMMM-DD-YY
### Added
- (NEXMANAGE-737) Enable sota cancel mode

### Fixed
- (NEXMANAGE-872) Fix provision-tc issue in TiberOS - cannot overwrite /etc/dispatcher.environment
- (NEXMANAGE-846) Fix granular log raise error when granular log file is empty
Expand Down
48 changes: 44 additions & 4 deletions inbm/dispatcher-agent/dispatcher/dispatcher_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import signal
import sys
from queue import Queue
from threading import Thread, active_count, Lock
from threading import Thread, active_count, Lock, Event
from time import sleep
from typing import Optional, Any, Mapping, Tuple, Sequence

Expand Down Expand Up @@ -55,6 +55,7 @@
from .sota.os_factory import SotaOsFactory
from .sota.sota import SOTA
from .sota.sota_error import SotaError
from .sota.cancel import cancel_thread
from .workload_orchestration import WorkloadOrchestration
from inbm_lib.xmlhandler import *
from inbm_lib.version import get_friendly_inbm_version_commit
Expand Down Expand Up @@ -94,6 +95,8 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv
# Initialize update_queue with a capacity of 1 to ensure serialized handling of updates.
self.update_queue: Queue[Tuple[str, str, Optional[str]]] = Queue(1)
self._thread_count = 1
self._thread_list: list[Thread] = []
self._active_thread_manifest: Optional[str] = None
self._sota_repos = None
self.sota_mode = None
self._package_list: str = ""
Expand All @@ -115,6 +118,7 @@ def __init__(self, args: list[str], broker: DispatcherBroker, install_check_serv

self.sqlite_mgr = SqliteManager()
self.ap_scheduler = APScheduler(sqlite_mgr=self.sqlite_mgr)
self._cancel_event = Event()

def stop(self) -> None:
self.RUNNING = False
Expand Down Expand Up @@ -186,7 +190,13 @@ def _sig_handler(signo, frame) -> None:
if active_count() - active_start_count < self._thread_count:
worker = Thread(target=handle_updates, args=(self,))
worker.setDaemon(True)
self._thread_list.append(worker)
worker.start()

# Periodically check if processes have finished. If process finished, remove it from the list.
for thread in self._thread_list[:]:
if not thread.is_alive():
self._thread_list.remove(thread)
sleep(1)

self._dispatcher_broker.mqtt_publish(f'{AGENT}/state', 'dead', retain=True)
Expand Down Expand Up @@ -310,6 +320,8 @@ def do_install(self, xml: str, schema_location: Optional[str] = None, job_id: st
result: Result = Result()
logger.debug("do_install")
parsed_head = None
# Assumption is that there is only one active OTA thread at a time
self._active_thread_manifest = xml
try: # TODO: Split into multiple try/except blocks
type_of_manifest, parsed_head = \
_check_type_validate_manifest(xml, schema_location=schema_location)
Expand Down Expand Up @@ -411,7 +423,8 @@ def _do_ota_update(self, ota_type: str, repo_type: str, resource: dict,
self._sota_repos,
self._install_check_service,
self._update_logger,
self.config_dbs)
self.config_dbs,
self._cancel_event)

p = factory.create_parser()
# NOTE: p.parse can raise one of the *otaError exceptions
Expand Down Expand Up @@ -440,7 +453,8 @@ def _validate_pota_manifest(self, repo_type: str,
self._sota_repos,
self._install_check_service,
self._update_logger,
self.config_dbs)
self.config_dbs,
self._cancel_event)
p = factory.create_parser()
# NOTE: p.parse can raise one of the *otaError exceptions
parsed_manifest = p.parse(ota_list[ota], kwargs, parsed_head)
Expand Down Expand Up @@ -497,7 +511,32 @@ def _on_cloud_request(self, topic: str, payload: str, qos: int) -> None:
request_type = topic.split('/')[2]
request_id = topic.split('/')[3] if len(topic.split('/')) > 3 else None
manifest = payload
self._add_request_to_queue(request_type, manifest, request_id)
if not self._handle_cancel_request(request_type, manifest):
self._add_request_to_queue(request_type, manifest, request_id)

def _handle_cancel_request(self, request_type: str, manifest: str) -> bool:
"""
Check if it is a SOTA cancel request. If it is, send the terminate signal to current process.
@param request_type: type of the request
@param manifest: manifest to be processed
@return: True if the request has been processed; False if no request has been handled.
"""
if request_type == "install":
type_of_manifest, parsed_head = \
_check_type_validate_manifest(manifest)
type_of_active_manifest = active_thread_parsed_head = None
if self._active_thread_manifest:
type_of_active_manifest, active_thread_parsed_head = \
_check_type_validate_manifest(self._active_thread_manifest)
result = cancel_thread(type_of_manifest, parsed_head, self._thread_list,
type_of_active_manifest, active_thread_parsed_head,
self._dispatcher_broker, self._cancel_event)
if result:
logger.debug(f"Request cancel complete.")
self._send_result(str(Result(CODE_OK, "Request complete.")))
return True
return False

def _on_message(self, topic: str, payload: Any, qos: int) -> None:
"""Called when a message is received from _telemetry-agent
Expand Down Expand Up @@ -619,6 +658,7 @@ def invoke_sota(self, snapshot: Optional[Any] = None, action: Optional[Any] = No
self._update_logger,
self._sota_repos,
self._install_check_service,
self._cancel_event,
snapshot, action)

sota_instance.execute(self.proceed_without_rollback)
Expand Down
17 changes: 12 additions & 5 deletions inbm/dispatcher-agent/dispatcher/ota_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""
import abc
import logging
import threading
from typing import Any, Optional, Mapping

from .config_dbs import ConfigDbs
Expand Down Expand Up @@ -53,7 +54,8 @@ def get_factory(ota_type,
sota_repos: Optional[str],
install_check_service: InstallCheckService,
update_logger: UpdateLogger,
dbs: ConfigDbs) -> "OtaFactory":
dbs: ConfigDbs,
cancel_event: threading.Event) -> "OtaFactory":
"""Create an OTA factory of a specified OTA type
@param ota_type: The OTA type
Expand All @@ -64,6 +66,7 @@ def get_factory(ota_type,
@param install_check_service: provides install_check
@param update_logger: UpdateLogger (expected to update after OTA)
@param dbs: ConfigDbs.ON or ConfigDbs.WARN or ConfigDbs.OFF
@param cancel_event: Event used to stop the downloading process
@raise ValueError: Unsupported OTA type
"""

Expand All @@ -72,7 +75,7 @@ def get_factory(ota_type,
return FotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger)
if ota_type == OtaType.SOTA.name:
return SotaFactory(repo_type, dispatcher_broker, proceed_without_rollback,
sota_repos, install_check_service, update_logger)
sota_repos, install_check_service, update_logger, cancel_event)
if ota_type == OtaType.AOTA.name:
return AotaFactory(repo_type, dispatcher_broker, install_check_service, update_logger, dbs=dbs)
if ota_type == OtaType.POTA.name:
Expand Down Expand Up @@ -116,7 +119,8 @@ class SotaFactory(OtaFactory):
@param proceed_without_rollback: Is it OK to run SOTA without rollback ability?
@param install_check_service: provides InstallCheckService
@param sota_repos: new Ubuntu/Debian mirror (or None)
@param update_logger: UpdateLogger (expected to update after OTA)
@param update_logger: UpdateLogger (expected to update after OTA)
@param cancel_event: Event used to stop the downloading process
"""

def __init__(self,
Expand All @@ -125,13 +129,15 @@ def __init__(self,
proceed_without_rollback: bool,
sota_repos: Optional[str],
install_check_service: InstallCheckService,
update_logger: UpdateLogger) -> None:
update_logger: UpdateLogger,
cancel_event: threading.Event) -> None:

super().__init__(repo_type, install_check_service)
self._sota_repos = sota_repos
self._proceed_without_rollback = proceed_without_rollback
self._update_logger = update_logger
self._dispatcher_broker = dispatcher_broker
self._cancel_event = cancel_event

def create_parser(self) -> OtaParser:
logger.debug(" ")
Expand All @@ -145,7 +151,8 @@ def create_thread(self, parsed_manifest: Mapping[str, Optional[Any]]) -> OtaThre
self._sota_repos,
self._install_check_service,
parsed_manifest,
self._update_logger)
self._update_logger,
self._cancel_event)


class AotaFactory(OtaFactory):
Expand Down
8 changes: 7 additions & 1 deletion inbm/dispatcher-agent/dispatcher/ota_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import abc
import logging
import os
import threading
from threading import Lock
from typing import Optional, Any, Mapping

Expand Down Expand Up @@ -142,6 +143,7 @@ class SotaThread(OtaThread):
@param install_check_service: provides install_check
@param parsed_manifest: parameters from OTA manifest
@param update_logger: UpdateLogger instance; expected to update when done with OTA
@param cancel_event: Event used to stop the downloading process
@return (dict): dict representation of COMMAND_SUCCESS or OTA_FAILURE/OTA_FAILURE_IN_PROGRESS
"""

Expand All @@ -152,13 +154,15 @@ def __init__(self,
sota_repos: Optional[str],
install_check_service: InstallCheckService,
parsed_manifest: Mapping[str, Optional[Any]],
update_logger: UpdateLogger) -> None:
update_logger: UpdateLogger,
cancel_event: threading.Event) -> None:
super().__init__(repo_type, parsed_manifest,
install_check_service=install_check_service)
self._sota_repos = sota_repos
self._proceed_without_rollback = proceed_without_rollback
self._update_logger = update_logger
self._dispatcher_broker = dispatcher_broker
self._cancel_event = cancel_event

def start(self) -> Result: # pragma: no cover
"""Starts the SOTA thread and which checks for existing locks before delegating to
Expand All @@ -177,6 +181,7 @@ def start(self) -> Result: # pragma: no cover
dispatcher_broker=self._dispatcher_broker,
update_logger=self._update_logger,
sota_repos=self._sota_repos,
cancel_event=self._cancel_event,
install_check_service=self._install_check_service)
try:
sota_instance.execute(self._proceed_without_rollback)
Expand All @@ -200,6 +205,7 @@ def check(self) -> None:
dispatcher_broker=self._dispatcher_broker,
update_logger=self._update_logger,
sota_repos=self._sota_repos,
cancel_event=self._cancel_event,
install_check_service=self._install_check_service)
sota_instance.check()
except SotaError as e:
Expand Down
88 changes: 88 additions & 0 deletions inbm/dispatcher-agent/dispatcher/sota/cancel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""
Method to handle cancel request
Copyright (C) 2017-2024 Intel Corporation
SPDX-License-Identifier: Apache-2.0
"""

import logging
from typing import Optional
from threading import Event
from inbm_lib.xmlhandler import XmlHandler
from threading import Thread
from .constants import SOTA_CACHE
from ..constants import OtaType
from dispatcher.dispatcher_exception import DispatcherException
from dispatcher.packagemanager.local_repo import DirectoryRepo
from dispatcher.dispatcher_broker import DispatcherBroker
from dispatcher.common.result_constants import Result, CODE_OK, CODE_BAD_REQUEST

logger = logging.getLogger(__name__)


def cancel_thread(type_of_manifest: str, parsed_head: XmlHandler, thread_list: list[Thread],
type_of_active_manifest: Optional[str], active_thread_parsed_head: Optional[XmlHandler],
dispatcher_broker: DispatcherBroker, cancel_event: Event) -> bool:
"""
Cancel the current active thread by sending the terminate signal.
@param type_of_manifest: type of the request
@param parsed_head: The root parsed xml
@param thread_list: List of the active thread
@param type_of_active_manifest: type of the request on running thread
@param active_thread_parsed_head: The root parsed xml of running thread
@param dispatcher_broker: DispatcherBroker object used to communicate with other INBM services
@param cancel_event: Event used to stop the downloading process
@return: True if the request has been processed; False if no request has been handled.
"""
if type_of_manifest == 'ota':
header = parsed_head.get_children('ota/header')
ota_type = header['type']
resource = parsed_head.get_children(f'ota/type/{ota_type}')
if ota_type == OtaType.SOTA.name.lower():
sota_mode = resource.get('mode', None)
if sota_mode == 'cancel':
logger.debug(f"Receive sota cancel request.")
# If the active thread is not SOTA download-only, forbid the cancel request.
if type_of_active_manifest and active_thread_parsed_head:
if not is_active_ota_sota_download_only(type_of_active_manifest, active_thread_parsed_head):
dispatcher_broker.send_result(
str(Result(CODE_BAD_REQUEST, "Current thread is not SOTA download-only. "
"Cannot proceed with the cancel request.")))
return True
else:
dispatcher_broker.send_result(str(Result(CODE_BAD_REQUEST, "Running thread manifest not found.")))
return True

# The list should only contain one OTA process.
for thread in thread_list:
if thread.is_alive():
cancel_event.set()
# Wait thread to gracefully exit
logger.debug(f"Waiting thread to exit...")
thread.join(timeout=300)
logger.debug(f"Request cancel complete.")
# Reset the event flag
cancel_event.clear()
return True
return False


def is_active_ota_sota_download_only(type_of_active_manifest: str, active_parsed_head: XmlHandler) -> bool:
"""
Check whether the current active thread is SOTA download-only mode.
@param type_of_active_manifest: type of the request
@param active_parsed_head: The root parsed xml
@return: True if it is SOTA download-only; False if not.
"""
logger.debug("")
if type_of_active_manifest == 'ota':
header = active_parsed_head.get_children('ota/header')
ota_type = header['type']
resource = active_parsed_head.get_children(f'ota/type/{ota_type}')
if ota_type == OtaType.SOTA.name.lower():
sota_mode = resource.get('mode', None)
if sota_mode == 'download-only':
return True
return False
Loading

0 comments on commit 5426e6c

Please sign in to comment.