Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ladislas/feature/ci functional system refactor #1469

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .flake8
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Leka - iOS Monorepo
# Copyright APF France handicap
# SPDX-License-Identifier: Apache-2.0

[flake8]
max-line-length = 120
6 changes: 6 additions & 0 deletions .pylintrc
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Leka - iOS Monorepo
# Copyright APF France handicap
# SPDX-License-Identifier: Apache-2.0

[LOGGING]
disable=logging-fstring-interpolation
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mbed-cli>=1.10.0,<2.0
pyserial>=3,<=3.4
pyserial>=3,<=3.5
intelhex>=2.3.0,<3.0.0
prettytable>=2.0,<3.0
imgtool>=2.0.0,<3.0.0
colorama
128 changes: 128 additions & 0 deletions tools/modules/flash_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Leka - LekaOS
# Copyright 2024 APF France handicap
# SPDX-License-Identifier: Apache-2.0

"""
Utility functions for flashing the OS binary to the device.
"""

import logging
from pathlib import Path
import subprocess
import sys
import shutil
import time

from colorama import Fore, Style


def check_external_tools():
"""
Ensure that required external tools are available.

Raises:
SystemExit: If any required tool is missing.
"""
required_tools = ["openocd", "st-flash"]
missing_tools = [tool for tool in required_tools if not shutil.which(tool)]
if missing_tools:
logging.error(f"Missing required tools: {', '.join(missing_tools)}")
sys.exit(1)


def flash_os_and_reset(os_bin_path: Path) -> bool:
"""
Flash the OS binary and reset the device.

Args:
os_bin_path (Path): Path to the OS binary.

Returns:
bool: True if both flashing and resetting succeed; False otherwise.
"""
try:
flash_os(str(os_bin_path))
return True
except SystemExit:
return False


def flash_os(os_bin_path: str):
"""
Flash the OS binary to the device using OpenOCD.

Args:
os_bin_path (str): Path to the OS binary file.

Raises:
SystemExit: If flashing fails.
"""
print(f"Flashing {os_bin_path}...")
cmd_flash = (
f"openocd -f interface/stlink.cfg "
f"-c 'transport select hla_swd' "
f"-f target/stm32f7x.cfg "
f"-c 'program {os_bin_path} 0x08000000' "
f"-c exit"
)
print(cmd_flash)
flash = subprocess.run(cmd_flash, shell=True, check=False)
if flash.returncode != 0:
print(f"Flashing {os_bin_path}... {Fore.RED}❌{Style.RESET_ALL}")
sys.exit(1)
print(f"Flashing {os_bin_path}... {Fore.GREEN}βœ…{Style.RESET_ALL}")

time.sleep(1)

print("Resetting robot...")
cmd_reset = (
"openocd -f interface/stlink.cfg "
"-c 'transport select hla_swd' "
"-f target/stm32f7x.cfg "
"-c init -c 'reset run' "
"-c exit"
)
print(cmd_reset)
reset = subprocess.run(cmd_reset, shell=True, check=False)
if reset.returncode != 0:
print(f"Resetting robot... {Fore.RED}❌{Style.RESET_ALL}")
sys.exit(1)
print(f"Resetting robot... {Fore.GREEN}βœ…{Style.RESET_ALL}")

time.sleep(1)


def erase_flash():
"""
Erase the flash memory of the device using st-flash.

Raises:
SystemExit: If erasing fails.
"""
print("Erasing flash...")
cmd_erase = "st-flash --connect-under-reset --reset erase"
ret = subprocess.run(cmd_erase, shell=True)
if ret.returncode != 0:
print(f"Erasing flash... {Fore.RED}❌{Style.RESET_ALL}")
sys.exit(1)
print(f"Erasing flash... {Fore.GREEN}βœ…{Style.RESET_ALL}")


def print_end_success(message: str):
"""
Print a success message in cyan with a checkmark.

Args:
message (str): The message to print.
"""
print(f"{Fore.CYAN}{message}... βœ…{Style.RESET_ALL}")


def print_end_failure(message: str):
"""
Print a failure message in red with a cross mark.

Args:
message (str): The message to print.
"""
print(f"{Fore.RED}{message}... ❌{Style.RESET_ALL}")
28 changes: 28 additions & 0 deletions tools/modules/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Leka - LekaOS
# Copyright 2024 APF France handicap
# SPDX-License-Identifier: Apache-2.0


"""
Configure logging settings based on verbosity level.
"""


import logging
import sys


def configure_logging(verbose: bool):
"""
Configure logging settings based on verbosity level.

Args:
verbose (bool): If True, set logging level to DEBUG; otherwise, INFO.
"""
logging.basicConfig(
level=logging.DEBUG if verbose else logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
if verbose:
logging.debug("Verbose logging enabled.")
139 changes: 139 additions & 0 deletions tools/modules/serial_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
# Leka - LekaOS
# Copyright 2024 APF France handicap
# SPDX-License-Identifier: Apache-2.0


"""
Utility functions for serial communication.
"""


import glob
import sys
from time import sleep
from typing import List, Optional
import serial
from colorama import Fore, Style

SERIAL_TIMEOUT = 0.1 # seconds


def connect_serial(port_pattern: str) -> serial.Serial:
"""
Connect to the first serial port matching the given pattern.

Args:
port_pattern (str): Glob pattern to match serial ports.

Returns:
serial.Serial: An open serial connection.

Raises:
SystemExit: If unable to connect to the serial port.
"""
ports = glob.glob(port_pattern)
serial_port = ports[0] if ports else port_pattern

try:
com = serial.Serial(serial_port, 115200, timeout=SERIAL_TIMEOUT)
print(f"Connected to {com.name}... {Fore.GREEN}βœ…{Style.RESET_ALL}")
return com
except serial.serialutil.SerialException as error:
print(
f"Error connecting to {serial_port}: {error}... {Fore.RED}❌{Style.RESET_ALL}"
)
sys.exit(1)


def reset_buffer(com: serial.Serial):
"""
Reset the serial input and output buffers and send a break signal.

Args:
com (serial.Serial): The serial connection to reset.
"""
print("Resetting COM buffer...")
try:
com.reset_input_buffer()
com.reset_output_buffer()
com.send_break(duration=1)
sleep(1)
print(f"Resetting COM buffer... {Fore.GREEN}βœ…{Style.RESET_ALL}")
except serial.SerialException as e:
print(f"Error resetting COM buffer: {e}... {Fore.RED}❌{Style.RESET_ALL}")
sys.exit(1)


def read_output_serial(com: serial.Serial) -> str:
"""
Read a line from the serial connection.

Args:
com (serial.Serial): The serial connection to read from.

Returns:
str: Decoded line from serial output.
"""
try:
data = com.readline().decode("utf-8").strip()
return data
except serial.SerialException as e:
print(f"Serial read error: {e}")
return ""


def wait_for_response(com: serial.Serial, response_timeout: float) -> Optional[str]:
"""
Wait for a response from the device within the specified timeout.

Args:
com (serial.Serial): The serial connection to read from.
response_timeout (float): Timeout in seconds.

Returns:
Optional[str]: The received data if any; otherwise, None.
"""
retries = int(response_timeout / 0.1)
for _ in range(retries):
sleep(0.1)
data = read_output_serial(com)
if data:
return data
return None


def wait_for_system_to_sleep(com: serial.Serial, duration: int) -> List[str]:
"""
Wait for the system to run for a specified duration, collecting relevant serial data.

Args:
com (serial.Serial): The serial connection to read from.
duration (int): Duration in seconds to wait.

Returns:
List[str]: Collected lines containing 'watchdog'.
"""
print("Waiting for system to run...")
data = []
for second in range(duration):
if com.in_waiting > 0:
print(f"{Fore.GREEN}β€’{Style.RESET_ALL}", end="", flush=True)
lines = com.readlines()
for line in lines:
decoded_line = line.decode("utf-8", errors="replace").rstrip()
data.append(decoded_line)
else:
print("β€’", end="", flush=True)

if (second + 1) % 60 == 0 and (second + 1) != duration:
print()

sleep(1)

print()
filtered_data = [line for line in data if "watchdog" in line][-10:]
print("\n".join(filtered_data))
print(
f"Waiting for system to run for {duration} seconds... {Fore.GREEN}βœ…{Style.RESET_ALL}"
)
return filtered_data
Loading
Loading