Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: [NEXMANAGE-737] Enable sota cancel mode. #559

Closed
wants to merge 3 commits into from

Conversation

yengliong93
Copy link
Contributor

PULL DESCRIPTION

This PR implements the sota cancel mode. When the thread is created, it will be added to a thread list.
When the dispatcher receives the sota cancel request, the dispatcher checks the current running thread and retrieves its id. Next, it sends a termination signal to the thread.

Impact Analysis

Info Please fill out this column
Root Cause Specifically for bugs, empty in case of no variants
Jira ticket Add the name to the Jira ticket eg: "NEXMANAGE-622". Automation will do the linking to Jira

CODE MAINTAINABILITY

  • Added required new tests relevant to the changes
  • Updated Documentation as relevant to the changes
  • PR change contains code related to security
  • PR introduces changes that break compatibility with other modules/services (If YES, please provide description)
  • Run go fmt or format-python.sh as applicable
  • Update Changelog
  • Integration tests are passing
  • If Cloudadapter changes, check Azure connectivity manually

Code must act as a teacher for future developers

This PR implements the sota cancel mode. When the thread is created, it
will be added  to a thread list.
When the dispatcher receives the sota cancel request, the dispatcher
checks the current running thread and retrieves its id. Next, it sends a
termination signal to the thread.

Signed-off-by: yengliong <[email protected]>
@gblewis1
Copy link
Contributor

gblewis1 commented Oct 2, 2024

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

@yengliong93
Copy link
Contributor Author

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

Hi @gblewis1, the current implementation will terminate any OTA that's running on the active thread. I'm thinking whether we should check the OTA type and only terminate the thread if it's the SOTA download-only mode.

@gblewis1
Copy link
Contributor

gblewis1 commented Oct 3, 2024

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

Hi @gblewis1, the current implementation will terminate any OTA that's running on the active thread. I'm thinking whether we should check the OTA type and only terminate the thread if it's the SOTA download-only mode.

another consideration is, there may be an inbc process waiting on the thread to report back to it, so we want to give the thread a chance to clean up gracefully.

here's an example of sending a signal to a thread that's downloading a file

import threading
import requests
import time
import os

def download_file(url, destination, stop_event, chunk_size=1024):
    """
    Downloads a file from the given URL to the destination path.
    Periodically checks if a stop_event is set to cancel the download gracefully.
    
    :param url: URL of the file to download
    :param destination: Local path to save the downloaded file
    :param stop_event: threading.Event to signal cancellation
    :param chunk_size: Size of each chunk to read (in bytes)
    """
    try:
        with requests.get(url, stream=True, timeout=10) as response:
            response.raise_for_status()  # Raise an error for bad status codes
            total_length = response.headers.get('content-length')

            if total_length is None:
                print("Unable to determine the file size.")
                total_length = 0
            else:
                total_length = int(total_length)
                print(f"Total file size: {total_length / (1024 * 1024):.2f} MB")

            downloaded = 0
            with open(destination, 'wb') as f:
                for chunk in response.iter_content(chunk_size=chunk_size):
                    if stop_event.is_set():
                        print("Download cancellation requested. Stopping download...")
                        break
                    if chunk:  # Filter out keep-alive chunks
                        f.write(chunk)
                        downloaded += len(chunk)
                        if total_length:
                            percent = (downloaded / total_length) * 100
                            print(f"Downloaded {downloaded / (1024 * 1024):.2f} MB "
                                  f"({percent:.2f}%)")
                        else:
                            print(f"Downloaded {downloaded / (1024 * 1024):.2f} MB")
        if stop_event.is_set():
            # Optionally, delete the incomplete file
            if os.path.exists(destination):
                os.remove(destination)
                print(f"Incomplete file '{destination}' has been deleted.")
            print("Download was cancelled gracefully.")
        else:
            print(f"Download completed successfully and saved to '{destination}'.")
    except (
        requests.exceptions.RequestException,  # Catches all requests-related exceptions
        IOError  # Catches file I/O related errors
    ) as e:
        print(f"An error occurred during download: {e}")
        cleanup(destination)

def cleanup(destination):
    """
    Cleans up the incomplete download file if it exists.
    
    :param destination: Path to the incomplete file
    """
    if os.path.exists(destination):
        try:
            os.remove(destination)
            print(f"Incomplete file '{destination}' has been deleted.")
        except OSError as e:
            print(f"Error deleting incomplete file: {e}")

def controller(stop_event, delay):
    """
    Simulates an external trigger to cancel the download after a certain delay.
    
    :param stop_event: threading.Event to signal cancellation
    :param delay: Time in seconds before triggering cancellation
    """
    print(f"Controller will request cancellation in {delay} seconds.")
    time.sleep(delay)
    print("Controller is signaling the download thread to stop.")
    stop_event.set()

def main():
    # URL of a large file for demonstration purposes
    # Replace this with any large file URL for testing
    url = "https://speed.hetzner.de/100MB.bin"  # Example: 100 MB file
    destination = "downloaded_file.bin"

    # Create an Event object to signal the worker thread to stop
    stop_event = threading.Event()

    # Create and start the download thread
    download_thread = threading.Thread(target=download_file, args=(url, destination, stop_event), name="DownloadThread")
    download_thread.start()

    # Create and start the controller thread
    # For example, cancel the download after 5 seconds
    cancel_delay = 5  # seconds
    controller_thread = threading.Thread(target=controller, args=(stop_event, cancel_delay), name="ControllerThread")
    controller_thread.start()

    # Wait for both threads to complete
    download_thread.join()
    controller_thread.join()

    print("Download process has been handled.")

if __name__ == "__main__":
    main()

@yengliong93
Copy link
Contributor Author

yengliong93 commented Oct 8, 2024

Hi @yengliong93 -- I started reviewing since I got an email about this PR, but I now see that it's 'work in progress' -- I'll stop until you're ready for review.

Hi @gblewis1, the current implementation will terminate any OTA that's running on the active thread. I'm thinking whether we should check the OTA type and only terminate the thread if it's the SOTA download-only mode.

another consideration is, there may be an inbc process waiting on the thread to report back to it, so we want to give the thread a chance to clean up gracefully.

here's an example of sending a signal to a thread that's downloading a file

import threading
import requests
import time
import os

def download_file(url, destination, stop_event, chunk_size=1024):
    """
    Downloads a file from the given URL to the destination path.
    Periodically checks if a stop_event is set to cancel the download gracefully.
    
    :param url: URL of the file to download
    :param destination: Local path to save the downloaded file
    :param stop_event: threading.Event to signal cancellation
    :param chunk_size: Size of each chunk to read (in bytes)
    """
    try:
        with requests.get(url, stream=True, timeout=10) as response:
            response.raise_for_status()  # Raise an error for bad status codes
            total_length = response.headers.get('content-length')

            if total_length is None:
                print("Unable to determine the file size.")
                total_length = 0
            else:
                total_length = int(total_length)
                print(f"Total file size: {total_length / (1024 * 1024):.2f} MB")

            downloaded = 0
            with open(destination, 'wb') as f:
                for chunk in response.iter_content(chunk_size=chunk_size):
                    if stop_event.is_set():
                        print("Download cancellation requested. Stopping download...")
                        break
                    if chunk:  # Filter out keep-alive chunks
                        f.write(chunk)
                        downloaded += len(chunk)
                        if total_length:
                            percent = (downloaded / total_length) * 100
                            print(f"Downloaded {downloaded / (1024 * 1024):.2f} MB "
                                  f"({percent:.2f}%)")
                        else:
                            print(f"Downloaded {downloaded / (1024 * 1024):.2f} MB")
        if stop_event.is_set():
            # Optionally, delete the incomplete file
            if os.path.exists(destination):
                os.remove(destination)
                print(f"Incomplete file '{destination}' has been deleted.")
            print("Download was cancelled gracefully.")
        else:
            print(f"Download completed successfully and saved to '{destination}'.")
    except (
        requests.exceptions.RequestException,  # Catches all requests-related exceptions
        IOError  # Catches file I/O related errors
    ) as e:
        print(f"An error occurred during download: {e}")
        cleanup(destination)

def cleanup(destination):
    """
    Cleans up the incomplete download file if it exists.
    
    :param destination: Path to the incomplete file
    """
    if os.path.exists(destination):
        try:
            os.remove(destination)
            print(f"Incomplete file '{destination}' has been deleted.")
        except OSError as e:
            print(f"Error deleting incomplete file: {e}")

def controller(stop_event, delay):
    """
    Simulates an external trigger to cancel the download after a certain delay.
    
    :param stop_event: threading.Event to signal cancellation
    :param delay: Time in seconds before triggering cancellation
    """
    print(f"Controller will request cancellation in {delay} seconds.")
    time.sleep(delay)
    print("Controller is signaling the download thread to stop.")
    stop_event.set()

def main():
    # URL of a large file for demonstration purposes
    # Replace this with any large file URL for testing
    url = "https://speed.hetzner.de/100MB.bin"  # Example: 100 MB file
    destination = "downloaded_file.bin"

    # Create an Event object to signal the worker thread to stop
    stop_event = threading.Event()

    # Create and start the download thread
    download_thread = threading.Thread(target=download_file, args=(url, destination, stop_event), name="DownloadThread")
    download_thread.start()

    # Create and start the controller thread
    # For example, cancel the download after 5 seconds
    cancel_delay = 5  # seconds
    controller_thread = threading.Thread(target=controller, args=(stop_event, cancel_delay), name="ControllerThread")
    controller_thread.start()

    # Wait for both threads to complete
    download_thread.join()
    controller_thread.join()

    print("Download process has been handled.")

if __name__ == "__main__":
    main()

Try this method in commit but it seems like not working.

First terminal - I run inbc sota -m download-only
Second terminal - I run inbc sota -m cancel

Both INBC terminals are stuck and dispatcher proceeds the request.
image
image

Dispatcher didn't stop the request and proceed.
image

------------- Update
The MQTT logs indicate that there are multiple INBC connections using the same cert to do the connection. It seems that both INBC threads are using the same cert, with the older connection closing and a new one being established repeatedly. Due to this operation, neither thread appears to be receiving any response.

@yengliong93 yengliong93 force-pushed the sota-cancel-mode branch 2 times, most recently from 7350981 to fe1ca80 Compare October 8, 2024 08:08
@yengliong93 yengliong93 changed the title WIP: [NEXMANAGE-737] Enable sota cancel mode. [NEXMANAGE-737] Enable sota cancel mode. Oct 8, 2024
@yengliong93
Copy link
Contributor Author

@gblewis1 I revert back to initial method. Factored out the parsing method into another python file and added unit tests. Please have a review, thanks.

@gblewis1
Copy link
Contributor

gblewis1 commented Oct 8, 2024

@yengliong93 Let's restrict this to just cancelling downloads rather than cancelling full SOTA threads. I'm concerned that stopping a SOTA thread could leave the system in an inconsistent state (e.g. if we're in the middle of an apt-get upgrade command). Also, if there is e.g., an inbc process waiting for SOTA to finish, it will never receive the message saying that SOTA is finished.

Question with regards to PUA: @pillaiabhishek -- how does PUA know there is a download in progress to cancel? We could just issue a download cancel command in all cases:

  • If a download is in progress, it will get canceled and we can start the new download
  • If not, no harm done; no action taken and we can start the new download

@yengliong93
Copy link
Contributor Author

I've just realized that the initial method actually terminates the main thread. The dispatcher service is halted after sending the SIGTERM signal.

@yengliong93 yengliong93 changed the title [NEXMANAGE-737] Enable sota cancel mode. WIP: [NEXMANAGE-737] Enable sota cancel mode. Oct 9, 2024
@yengliong93
Copy link
Contributor Author

PR continue at #567

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants