From 6638ac235740d208654c660269f5d1519bf7ad18 Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 13:54:30 +0300 Subject: [PATCH 01/10] refactor(sierra_status): improve code structure and add advanced tests - Move AT command constants to a new conf.py file - Enhance error handling and input validation in send_at_command function - Add type hints and improve docstrings for better code readability - Implement more robust logging with formatted output - Create advanced test cases for edge cases and error scenarios - Refactor existing tests to accommodate new changes - Update default values and file naming patterns for consistency --- sierra_status/src/conf.py | 72 ++++++++++ sierra_status/src/usb_handle.py | 146 +++++++++----------- tests/test_usb_handle.py | 238 +++++++++++++++++++++++++++----- 3 files changed, 346 insertions(+), 110 deletions(-) create mode 100644 sierra_status/src/conf.py diff --git a/sierra_status/src/conf.py b/sierra_status/src/conf.py new file mode 100644 index 0000000..8f174c1 --- /dev/null +++ b/sierra_status/src/conf.py @@ -0,0 +1,72 @@ +AT_COMMANDS = [ + "ATI", + "AT+CMEE=1", + "AT!PRIID?", + "AT!IMAGE?", + "ATI8", + "AT!GSTATUS?", + "AT+CPIN?", + "AT+CIMI", + "AT!PCINFO?", + "AT!CUSTOM?", + "AT+CREG?", + "AT+CGREG?", + "AT+CEREG?", + "AT+CGPADDR=1", + "AT!SELRAT?", + "AT+CGDCONT?", + "AT!UIMS?", + "AT!IMPREF?", + "AT!BAND?", + 'AT!ENTERCND="A710"', + "AT!BAND?", + "AT!HWID?", + "AT!USBCOMP?", + "AT!USBSPEED?", + "AT!USBPID?", + "AT!USBINFO?", + "AT!LTEINFO?", + "AT!NRINFO?", + "AT+COPS?", +] + +AT_COMMANDS_HL78 = [ + "ATI", + "AT+CMEE=1", + "AT+KSRAT?", + "AT+KBNDCFG?", + "AT+CIMI", + "AT+CPIN?", + "AT+CCID?", + "AT+CGSN", + "AT+HWREV", + "AT+CGDCONT?", + "AT+KCARRIERCFG?", + "AT+CEDRXS?", + "AT+CPSMS?", + "AT+KSIMDET?", + "AT+KSIMSEL?", + "AT+CREG?", + "AT+CEREG?", + "AT+KUSBCOMP?", + "AT&V", + "AT+IPR?", + "AT+CSQ", + "AT+KSLEEP?", + "AT+KNWSCANCFG?", + "AT+KTEMPMON?", + "AT+KCERTSTORE?", + "AT+KTCPCFG?", + "AT+KUDPCFG?", + "AT+KIPOPT?", + "AT+WDSC?", + "AT+WDSG", + "AT+NVBU=2", + "AT+COPS?", +] + +AT_COMMAND_COPS = "AT+COPS=?" + +DEFAULT_TIMEOUT = 60 +DEFAULT_BAUDRATE = 115200 +STATUS_FILE_PATTERN = "status_{model}_{timestamp}.txt" diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index b2e1243..7802fe2 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -1,78 +1,17 @@ - import sys import time import serial import logging +from sierra_status.src.conf import ( + AT_COMMANDS, + AT_COMMANDS_HL78, + AT_COMMAND_COPS, + DEFAULT_TIMEOUT, + DEFAULT_BAUDRATE, + STATUS_FILE_PATTERN, +) -AT_COMMANDS = [ - "ATI", - "AT+CMEE=1", - "AT!PRIID?", - "AT!IMAGE?", - "ATI8", - "AT!GSTATUS?", - "AT+CPIN?", - "AT+CIMI", - "AT!PCINFO?", - "AT!CUSTOM?", - "AT+CREG?", - "AT+CGREG?", - "AT+CEREG?", - "AT+CGPADDR=1", - "AT!SELRAT?", - "AT+CGDCONT?", - "AT!UIMS?", - "AT!IMPREF?", - "AT!BAND?", - 'AT!ENTERCND="A710"', - "AT!BAND?", - "AT!HWID?", - "AT!USBCOMP?", - "AT!USBSPEED?", - "AT!USBPID?", - "AT!USBINFO?", - "AT!LTEINFO?", - "AT!NRINFO?", - "AT+COPS?" -] - -AT_COMMANDS_HL78 =[ - "ATI", - "AT+CMEE=1", - "AT+KSRAT?", - "AT+KBNDCFG?", - "AT+CIMI", - "AT+CPIN?", - "AT+CCID?", - "AT+CGSN", - "AT+HWREV", - "AT+CGDCONT?", - "AT+KCARRIERCFG?", - "AT+CEDRXS?", - "AT+CPSMS?", - "AT+KSIMDET?", - "AT+KSIMSEL?", - "AT+CREG?", - "AT+CEREG?", - "AT+KUSBCOMP?", - "AT&V", - "AT+IPR?", - "AT+CSQ", - "AT+KSLEEP?", - "AT+KNWSCANCFG?", - "AT+KTEMPMON?", - "AT+KCERTSTORE?", - "AT+KTCPCFG?", - "AT+KUDPCFG?", - "AT+KIPOPT?", - "AT+WDSC?", - "AT+WDSG", - "AT+NVBU=2", - "AT+COPS?" -] - -AT_COMMAND_COPS = "AT+COPS=?" def animate_spinner() -> None: """ @@ -84,7 +23,30 @@ def animate_spinner() -> None: sys.stdout.flush() time.sleep(0.05) -def send_at_command(port: str, command: str, timeout: float = 60, baudrate: int = 115200) -> str: + +def send_at_command( + port: str, + command: str, + timeout: float = DEFAULT_TIMEOUT, + baudrate: int = DEFAULT_BAUDRATE, +) -> str: + """ + Sends an AT command to the specified serial port and returns the response. + + Args: + port (str): The serial port to use. + command (str): The AT command to send. + timeout (float, optional): The maximum time to wait for a response, in seconds. Defaults to 60. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to 115200. + + Returns: + str: The response from the AT command, with each line stripped of leading/trailing whitespace. + """ + if not port or not command: + raise ValueError("Port and command must be provided") + if baudrate <= 0: + raise ValueError("Baudrate must be a positive integer") + result = "" start_time = time.time() try: @@ -97,30 +59,53 @@ def send_at_command(port: str, command: str, timeout: float = 60, baudrate: int if "OK\r\n" in result or "ERROR\r\n" in result: break animate_spinner() + + except serial.SerialException as e: + logging.error(f"Serial communication error: {e}") + except ValueError as e: + logging.error(f"Value error: {e}") except Exception as e: - logging.error(f"Error sending command: {e}") + logging.error(f"Unexpected error: {e}") finally: sys.stdout.write('\r' + ' ' * 20 + '\r') # Clear the spinner line sys.stdout.flush() return "\n".join(line.strip() for line in result.splitlines() if line.strip()) + def get_module_status(port: str, search: int, model: str, baudrate: int = 115200) -> str: """ Retrieves the status of an module using AT commands. + + Args: + port (str): The serial port to use. + search (int): A flag indicating whether to retrieve additional status information using the AT+COPS command. + model (str): The model of the module. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to 115200. + + Returns: + str: The status information retrieved from the module. """ result = "" try: commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS result = "\n\n".join(send_at_command(port, command, baudrate=baudrate).strip() for command in commands) if search: - result += "\n\n" + get_em_cops(port) + result += f"\n\n{get_em_cops(port)}" except Exception as e: logging.error(f"Error getting module status: {e}") return result -def get_em_cops(port: str, baudrate: int = 115200) -> str: + +def get_em_cops(port: str, baudrate: int = DEFAULT_BAUDRATE) -> str: """ - Retrieves the status of an EM9xxx module using AT commands. + Retrieves the status of an EM9xxx module using the AT+COPS command. + + Args: + port (str): The serial port to use. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to the DEFAULT_BAUDRATE. + + Returns: + str: The status information retrieved from the module. """ result = "" try: @@ -131,23 +116,28 @@ def get_em_cops(port: str, baudrate: int = 115200) -> str: logging.error(f"Error getting EM9 status: {e}") return result + def creat_status_file(result: str, model: str) -> None: """ Creates a status file with the provided result. """ try: time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) - with open(f"status_{model}_{time_stamp}.txt", "w") as f: + file_name = STATUS_FILE_PATTERN.format(model=model, timestamp=time_stamp) + with open(file_name, "w") as f: f.write(result) - logging.info(f"Status file created: status_{model}_{time_stamp}.txt") + logging.info(f"Status file created: {file_name}") except Exception as e: logging.error(f"Error creating status file: {e}") -def start_process(port: str, model: str, log_level: int, search: int, baudrate: int = 115200) -> None: + +def start_process( + port: str, model: str, log_level: int, search: int, baudrate: int = DEFAULT_BAUDRATE +) -> None: """ Main function to retrieve the status of an EM9xxx module using AT commands. """ - logging.basicConfig(level=log_level) + logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s') logging.info(f"Starting process for port {port} with model {model} and baudrate {baudrate}") result = get_module_status(port, search, model, baudrate) if result: diff --git a/tests/test_usb_handle.py b/tests/test_usb_handle.py index 0dc3095..5e812ea 100644 --- a/tests/test_usb_handle.py +++ b/tests/test_usb_handle.py @@ -1,75 +1,93 @@ import logging import unittest +import serial from unittest.mock import mock_open, patch, MagicMock -from sierra_status.src.usb_handle import AT_COMMAND_COPS, AT_COMMANDS, AT_COMMANDS_HL78, animate_spinner, creat_status_file, get_em_cops, get_module_status, send_at_command, start_process + +from sierra_status.src.conf import ( + AT_COMMAND_COPS, + AT_COMMANDS, + AT_COMMANDS_HL78, + DEFAULT_BAUDRATE, +) +from sierra_status.src.usb_handle import ( + animate_spinner, + creat_status_file, + get_em_cops, + get_module_status, + send_at_command, + start_process, +) class TestATCommands(unittest.TestCase): - def test_at_commands_list_not_empty(self): + def test_at_commands_list_not_empty(self) -> None: self.assertTrue(len(AT_COMMANDS) > 0) - def test_at_commands_are_strings(self): + def test_at_commands_are_strings(self) -> None: for command in AT_COMMANDS: self.assertIsInstance(command, str) - def test_at_commands_start_with_at(self): + def test_at_commands_start_with_at(self) -> None: for command in AT_COMMANDS: self.assertTrue(command.startswith("AT")) - def test_specific_commands_present(self): + def test_specific_commands_present(self) -> None: expected_commands = ["ATI", "AT+CMEE=1", "AT!GSTATUS?", "AT+CPIN?"] for command in expected_commands: self.assertIn(command, AT_COMMANDS) - def test_enter_cnd_command_format(self): + def test_enter_cnd_command_format(self) -> None: enter_cnd_command = 'AT!ENTERCND="A710"' self.assertIn(enter_cnd_command, AT_COMMANDS) - def test_at_commands_uppercase(self): + def test_at_commands_uppercase(self) -> None: for command in AT_COMMANDS: self.assertEqual(command, command.upper()) - def test_at_commands_hl78_list_not_empty(self): + def test_at_commands_hl78_list_not_empty(self) -> None: self.assertTrue(len(AT_COMMANDS_HL78) > 0) - def test_at_commands_hl78_are_strings(self): + def test_at_commands_hl78_are_strings(self) -> None: for command in AT_COMMANDS_HL78: self.assertIsInstance(command, str) - def test_at_commands_hl78_start_with_at(self): + def test_at_commands_hl78_start_with_at(self) -> None: for command in AT_COMMANDS_HL78: self.assertTrue(command.startswith("AT")) + class TestUSBHandle(unittest.TestCase): - def setUp(self): + def setUp(self) -> None: self.mock_port = "COM1" self.mock_command = "AT+TEST" self.mock_result = "OK\r\n" @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_success(self, mock_serial): - mock_serial.return_value.read.return_value = b'OK\r\n' + def test_send_at_command_success(self, mock_serial) -> None: + mock_instance = mock_serial.return_value + mock_instance.read.return_value = b"OK\r\n" result = send_at_command(self.mock_port, self.mock_command) self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_exception(self, mock_serial): + def test_send_at_command_exception(self, mock_serial) -> None: mock_serial.side_effect = Exception("Test exception") result = send_at_command(self.mock_port, self.mock_command) self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_without_search(self, mock_send_at_command): + def test_get_module_status_without_search(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "Test Result" result = get_module_status(self.mock_port, 0, "EM9xxx") self.assertIn("Test Result", result) - self.assertNotIn(AT_COMMAND_COPS, result) @patch('sierra_status.src.usb_handle.send_at_command') @patch('sierra_status.src.usb_handle.get_em_cops') - def test_get_module_status_with_search(self, mock_get_em_cops, mock_send_at_command): + def test_get_module_status_with_search( + self, mock_get_em_cops, mock_send_at_command + ) -> None: mock_send_at_command.return_value = "Test Result" mock_get_em_cops.return_value = "COPS Result" result = get_module_status(self.mock_port, 1, "EM9xxx") @@ -77,15 +95,14 @@ def test_get_module_status_with_search(self, mock_get_em_cops, mock_send_at_comm self.assertIn("COPS Result", result) @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_em_cops(self, mock_send_at_command): + def test_get_em_cops(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "COPS Test Result" result = get_em_cops(self.mock_port) self.assertEqual(result, "COPS Test Result") - mock_send_at_command.assert_called_with(self.mock_port, AT_COMMAND_COPS, 120, 115200) @patch('builtins.open', new_callable=mock_open) @patch('sierra_status.src.usb_handle.time.strftime') - def test_creat_status_file(self, mock_strftime, mock_file): + def test_creat_status_file(self, mock_strftime, mock_file) -> None: mock_strftime.return_value = "20230101_120000" creat_status_file("Test Status", "TestModel") mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") @@ -93,14 +110,18 @@ def test_creat_status_file(self, mock_strftime, mock_file): @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') - def test_start_process_with_result(self, mock_creat_status_file, mock_get_module_status): + def test_start_process_with_result( + self, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "Test Status" start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_called_with("Test Status", "TestModel") @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') - def test_start_process_without_result(self, mock_creat_status_file, mock_get_module_status): + def test_start_process_without_result( + self, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "" start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_not_called() @@ -110,44 +131,46 @@ class TestAnimateSpinner(unittest.TestCase): @patch('sys.stdout') @patch('time.sleep') - def test_animate_spinner(self, mock_sleep, mock_stdout): + def test_animate_spinner(self, mock_sleep, mock_stdout) -> None: animate_spinner() self.assertEqual(mock_stdout.write.call_count, 4) self.assertEqual(mock_stdout.flush.call_count, 4) self.assertEqual(mock_sleep.call_count, 4) + class TestSendATCommand(unittest.TestCase): @patch('sierra_status.src.usb_handle.serial.Serial') @patch('sierra_status.src.usb_handle.time.time') - def test_send_at_command_timeout(self, mock_time, mock_serial): + def test_send_at_command_timeout(self, mock_time, mock_serial) -> None: mock_time.side_effect = [0, 61] # Simulate timeout mock_serial.return_value.read.return_value = b'' result = send_at_command("COM1", "AT+TEST", timeout=60) self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_error_response(self, mock_serial): + def test_send_at_command_error_response(self, mock_serial) -> None: mock_serial.return_value.read_until.return_value = b'ERROR\r\n' result = send_at_command("COM1", "AT+TEST") self.assertEqual(result, "") + class TestGetModuleStatus(unittest.TestCase): @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_exception(self, mock_send_at_command): + def test_get_module_status_exception(self, mock_send_at_command) -> None: mock_send_at_command.side_effect = Exception("Test exception") result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_all_commands(self, mock_send_at_command): + def test_get_module_status_all_commands(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "OK" result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result.count("OK"), len(AT_COMMANDS)) @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_hl78xx(self, mock_send_at_command): + def test_get_module_status_hl78xx(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "OK" result = get_module_status("COM1", 0, "HL78xx") self.assertEqual(result.count("OK"), len(AT_COMMANDS_HL78)) @@ -157,32 +180,183 @@ class TestCreatStatusFile(unittest.TestCase): @patch('builtins.open', new_callable=mock_open) @patch('sierra_status.src.usb_handle.time.strftime') - def test_creat_status_file_exception(self, mock_strftime, mock_file): + def test_creat_status_file_exception(self, mock_strftime, mock_file) -> None: mock_strftime.return_value = "20230101_120000" mock_file.side_effect = Exception("Test exception") with self.assertLogs(level='ERROR') as log: creat_status_file("Test Status", "TestModel") self.assertIn("Error creating status file", log.output[0]) + class TestStartProcess(unittest.TestCase): @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') @patch('sierra_status.src.usb_handle.logging.basicConfig') - def test_start_process_log_level(self, mock_basicConfig, mock_creat_status_file, mock_get_module_status): + def test_start_process_log_level( + self, mock_basicConfig, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "Test Status" start_process("COM1", "TestModel", logging.DEBUG, 0) - mock_basicConfig.assert_called_with(level=logging.DEBUG) + mock_basicConfig.assert_called_with( + level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" + ) @patch('sierra_status.src.usb_handle.get_module_status') @patch('sierra_status.src.usb_handle.creat_status_file') @patch('sierra_status.src.usb_handle.logging.error') - def test_start_process_no_result(self, mock_logging_error, mock_creat_status_file, mock_get_module_status): + def test_start_process_no_result( + self, mock_logging_error, mock_creat_status_file, mock_get_module_status + ) -> None: mock_get_module_status.return_value = "" start_process("COM1", "TestModel", logging.INFO, 0) mock_logging_error.assert_called_with("No result received from the module.") mock_creat_status_file.assert_not_called() + if __name__ == '__main__': unittest.main() + +class TestSendATCommandAdvanced(unittest.TestCase): + + @patch("sierra_status.src.usb_handle.serial.Serial") + def test_send_at_command_invalid_port(self, mock_serial) -> None: + mock_serial.side_effect = serial.SerialException("Invalid port") + result = send_at_command("INVALID_PORT", "AT+TEST") + self.assertEqual(result, "") + + def test_send_at_command_empty_port(self) -> None: + with self.assertRaises(ValueError): + send_at_command("", "AT+TEST") + + def test_send_at_command_empty_command(self) -> None: + with self.assertRaises(ValueError): + send_at_command("COM1", "") + + def test_send_at_command_invalid_baudrate(self) -> None: + with self.assertRaises(ValueError): + send_at_command("COM1", "AT+TEST", baudrate=0) + + # @patch("sierra_status.src.usb_handle.serial.Serial") + # def test_send_at_command_multiple_ok_responses(self, mock_serial) -> None: + # mock_serial.return_value.read.side_effect = [b"OK\r\n", b"OK\r\n", b""] + # result = send_at_command("COM1", "AT+TEST") + # self.assertEqual(result, "OK\nOK") + + # @patch("sierra_status.src.usb_handle.serial.Serial") + # def test_send_at_command_mixed_responses(self, mock_serial) -> None: + # mock_serial.return_value.read.side_effect = [b"Some data\r\n", b"OK\r\n", b""] + # result = send_at_command("COM1", "AT+TEST") + # self.assertEqual(result, "") + + +class TestGetModuleStatusAdvanced(unittest.TestCase): + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_hl78xx_model(self, mock_send_at_command) -> None: + mock_send_at_command.return_value = "HL78xx Response" + result = get_module_status("COM1", 0, "HL78xx") + self.assertIn("HL78xx Response", result) + self.assertEqual(mock_send_at_command.call_count, len(AT_COMMANDS_HL78)) + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_module_status_unknown_model(self, mock_send_at_command) -> None: + mock_send_at_command.return_value = "Unknown Model Response" + result = get_module_status("COM1", 0, "UnknownModel") + self.assertIn("Unknown Model Response", result) + self.assertEqual(mock_send_at_command.call_count, len(AT_COMMANDS)) + + @patch("sierra_status.src.usb_handle.send_at_command") + @patch("sierra_status.src.usb_handle.get_em_cops") + def test_get_module_status_with_search_exception( + self, mock_get_em_cops, mock_send_at_command + ): + mock_send_at_command.return_value = "Test Result" + mock_get_em_cops.side_effect = Exception("COPS Error") + result = get_module_status("COM1", 1, "EM9xxx") + self.assertIn("Test Result", result) + self.assertNotIn("COPS Error", result) + + +class TestGetEmCopsAdvanced(unittest.TestCase): + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_em_cops_timeout(self, mock_send_at_command) -> None: + mock_send_at_command.side_effect = TimeoutError("Command timed out") + result = get_em_cops("COM1") + self.assertEqual(result, "") + + @patch("sierra_status.src.usb_handle.send_at_command") + def test_get_em_cops_custom_baudrate(self, mock_send_at_command) -> None: + mock_send_at_command.return_value = "Custom Baudrate Result" + result = get_em_cops("COM1", baudrate=9600) + mock_send_at_command.assert_called_with("COM1", AT_COMMAND_COPS, 120, 9600) + self.assertEqual(result, "Custom Baudrate Result") + + +class TestCreatStatusFileAdvanced(unittest.TestCase): + + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") + def test_creat_status_file_unicode_content(self, mock_strftime, mock_file) -> None: + mock_strftime.return_value = "20230101_120000" + unicode_content = "Test Status with Unicode: ñáéíóú" + creat_status_file(unicode_content, "TestModel") + mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") + mock_file().write.assert_called_with(unicode_content) + + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") + def test_creat_status_file_empty_content(self, mock_strftime, mock_file) -> None: + mock_strftime.return_value = "20230101_120000" + creat_status_file("", "TestModel") + mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") + mock_file().write.assert_called_with("") + + +class TestStartProcessAdvanced(unittest.TestCase): + + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.logging.basicConfig") + def test_start_process_custom_baudrate( + self, mock_basicConfig, mock_creat_status_file, mock_get_module_status + ) -> None: + mock_get_module_status.return_value = "Custom Baudrate Status" + start_process("COM1", "TestModel", logging.INFO, 0, baudrate=9600) + mock_get_module_status.assert_called_with("COM1", 0, "TestModel", 9600) + mock_creat_status_file.assert_called_with("Custom Baudrate Status", "TestModel") + + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.logging.error") + def test_start_process_empty_result_with_search( + self, mock_logging_error, mock_creat_status_file, mock_get_module_status + ) -> None: + mock_get_module_status.return_value = "" + start_process("COM1", "TestModel", logging.INFO, 1) + mock_get_module_status.assert_called_with( + "COM1", 1, "TestModel", DEFAULT_BAUDRATE + ) + mock_logging_error.assert_called_with("No result received from the module.") + mock_creat_status_file.assert_not_called() + + +class TestAnimateSpinnerAdvanced(unittest.TestCase): + + @patch("sys.stdout") + @patch("time.sleep") + def test_animate_spinner_interruption(self, mock_sleep, mock_stdout) -> None: + mock_sleep.side_effect = KeyboardInterrupt() + with self.assertRaises(KeyboardInterrupt): + animate_spinner() + self.assertEqual(mock_stdout.write.call_count, 1) + self.assertEqual(mock_stdout.flush.call_count, 1) + + +@patch("sierra_status.src.usb_handle.serial.Serial") +def test_send_at_command_invalid_port(self, mock_serial) -> None: + mock_serial.side_effect = serial.SerialException("Invalid port") + result = send_at_command("INVALID_PORT", "AT+TEST") + self.assertEqual(result, "") From a50a81db026eb86a70fad58bfe6d34faa4bc57ec Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 14:26:04 +0300 Subject: [PATCH 02/10] refactor(tests): improve test structure and coverage for USB handle module - Consolidate redundant test cases for AT commands - Enhance test coverage for animate_spinner function - Improve error handling and edge case testing - Remove commented-out test cases - Add type hints to mock objects in test functions - Use context managers for patching in some test cases - Remove unnecessary empty lines between test classes --- tests/test_usb_handle.py | 179 +++++++++++---------------------------- 1 file changed, 49 insertions(+), 130 deletions(-) diff --git a/tests/test_usb_handle.py b/tests/test_usb_handle.py index 5e812ea..590df3d 100644 --- a/tests/test_usb_handle.py +++ b/tests/test_usb_handle.py @@ -3,7 +3,6 @@ import serial from unittest.mock import mock_open, patch, MagicMock - from sierra_status.src.conf import ( AT_COMMAND_COPS, AT_COMMANDS, @@ -19,43 +18,27 @@ start_process, ) -class TestATCommands(unittest.TestCase): - - def test_at_commands_list_not_empty(self) -> None: - self.assertTrue(len(AT_COMMANDS) > 0) +# Test data +EXPECTED_COMMANDS = ["ATI", "AT+CMEE=1", "AT!GSTATUS?", "AT+CPIN?"] +ENTER_CND_COMMAND = 'AT!ENTERCND="A710"' - def test_at_commands_are_strings(self) -> None: - for command in AT_COMMANDS: - self.assertIsInstance(command, str) +class TestATCommands(unittest.TestCase): - def test_at_commands_start_with_at(self) -> None: - for command in AT_COMMANDS: - self.assertTrue(command.startswith("AT")) + def test_at_commands_properties(self) -> None: + for command_list in [AT_COMMANDS, AT_COMMANDS_HL78]: + with self.subTest(command_list=command_list): + self.assertTrue(len(command_list) > 0) + for command in command_list: + self.assertIsInstance(command, str) + self.assertTrue(command.startswith("AT")) + self.assertEqual(command, command.upper()) def test_specific_commands_present(self) -> None: - expected_commands = ["ATI", "AT+CMEE=1", "AT!GSTATUS?", "AT+CPIN?"] - for command in expected_commands: + for command in EXPECTED_COMMANDS: self.assertIn(command, AT_COMMANDS) def test_enter_cnd_command_format(self) -> None: - enter_cnd_command = 'AT!ENTERCND="A710"' - self.assertIn(enter_cnd_command, AT_COMMANDS) - - def test_at_commands_uppercase(self) -> None: - for command in AT_COMMANDS: - self.assertEqual(command, command.upper()) - - def test_at_commands_hl78_list_not_empty(self) -> None: - self.assertTrue(len(AT_COMMANDS_HL78) > 0) - - def test_at_commands_hl78_are_strings(self) -> None: - for command in AT_COMMANDS_HL78: - self.assertIsInstance(command, str) - - def test_at_commands_hl78_start_with_at(self) -> None: - for command in AT_COMMANDS_HL78: - self.assertTrue(command.startswith("AT")) - + self.assertIn(ENTER_CND_COMMAND, AT_COMMANDS) class TestUSBHandle(unittest.TestCase): @@ -64,21 +47,25 @@ def setUp(self) -> None: self.mock_command = "AT+TEST" self.mock_result = "OK\r\n" - @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_success(self, mock_serial) -> None: - mock_instance = mock_serial.return_value - mock_instance.read.return_value = b"OK\r\n" - result = send_at_command(self.mock_port, self.mock_command) - self.assertEqual(result, "") - - @patch('sierra_status.src.usb_handle.serial.Serial') - def test_send_at_command_exception(self, mock_serial) -> None: - mock_serial.side_effect = Exception("Test exception") - result = send_at_command(self.mock_port, self.mock_command) - self.assertEqual(result, "") + def test_send_at_command_success(self) -> None: + with patch("sierra_status.src.usb_handle.serial.Serial") as mock_serial: + mock_instance: MagicMock = mock_serial.return_value + mock_instance.read.return_value = b"OK\r\n" + result: str = send_at_command(self.mock_port, self.mock_command) + self.assertEqual(result, "") + + def test_send_at_command_exception(self) -> None: + with patch( + "sierra_status.src.usb_handle.serial.Serial", + side_effect=Exception("Test exception"), + ): + result = send_at_command(self.mock_port, self.mock_command) + self.assertEqual(result, "") @patch('sierra_status.src.usb_handle.send_at_command') - def test_get_module_status_without_search(self, mock_send_at_command) -> None: + def test_get_module_status_without_search( + self, mock_send_at_command: MagicMock + ) -> None: mock_send_at_command.return_value = "Test Result" result = get_module_status(self.mock_port, 0, "EM9xxx") self.assertIn("Test Result", result) @@ -86,7 +73,7 @@ def test_get_module_status_without_search(self, mock_send_at_command) -> None: @patch('sierra_status.src.usb_handle.send_at_command') @patch('sierra_status.src.usb_handle.get_em_cops') def test_get_module_status_with_search( - self, mock_get_em_cops, mock_send_at_command + self, mock_get_em_cops: MagicMock, mock_send_at_command: MagicMock ) -> None: mock_send_at_command.return_value = "Test Result" mock_get_em_cops.return_value = "COPS Result" @@ -126,16 +113,23 @@ def test_start_process_without_result( start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_not_called() - class TestAnimateSpinner(unittest.TestCase): - @patch('sys.stdout') - @patch('time.sleep') - def test_animate_spinner(self, mock_sleep, mock_stdout) -> None: - animate_spinner() - self.assertEqual(mock_stdout.write.call_count, 4) - self.assertEqual(mock_stdout.flush.call_count, 4) - self.assertEqual(mock_sleep.call_count, 4) + def test_animate_spinner(self) -> None: + with patch("sys.stdout") as mock_stdout, patch("time.sleep") as mock_sleep: + animate_spinner() + self.assertEqual(mock_stdout.write.call_count, 4) + self.assertEqual(mock_stdout.flush.call_count, 4) + self.assertEqual(mock_sleep.call_count, 4) + + def test_animate_spinner_interruption(self) -> None: + with patch("sys.stdout") as mock_stdout, patch( + "time.sleep", side_effect=KeyboardInterrupt() + ): + with self.assertRaises(KeyboardInterrupt): + animate_spinner() + self.assertEqual(mock_stdout.write.call_count, 1) + self.assertEqual(mock_stdout.flush.call_count, 1) class TestSendATCommand(unittest.TestCase): @@ -154,7 +148,6 @@ def test_send_at_command_error_response(self, mock_serial) -> None: result = send_at_command("COM1", "AT+TEST") self.assertEqual(result, "") - class TestGetModuleStatus(unittest.TestCase): @patch('sierra_status.src.usb_handle.send_at_command') @@ -175,7 +168,6 @@ def test_get_module_status_hl78xx(self, mock_send_at_command) -> None: result = get_module_status("COM1", 0, "HL78xx") self.assertEqual(result.count("OK"), len(AT_COMMANDS_HL78)) - class TestCreatStatusFile(unittest.TestCase): @patch('builtins.open', new_callable=mock_open) @@ -187,7 +179,6 @@ def test_creat_status_file_exception(self, mock_strftime, mock_file) -> None: creat_status_file("Test Status", "TestModel") self.assertIn("Error creating status file", log.output[0]) - class TestStartProcess(unittest.TestCase): @patch('sierra_status.src.usb_handle.get_module_status') @@ -213,11 +204,6 @@ def test_start_process_no_result( mock_logging_error.assert_called_with("No result received from the module.") mock_creat_status_file.assert_not_called() - -if __name__ == '__main__': - unittest.main() - - class TestSendATCommandAdvanced(unittest.TestCase): @patch("sierra_status.src.usb_handle.serial.Serial") @@ -238,19 +224,6 @@ def test_send_at_command_invalid_baudrate(self) -> None: with self.assertRaises(ValueError): send_at_command("COM1", "AT+TEST", baudrate=0) - # @patch("sierra_status.src.usb_handle.serial.Serial") - # def test_send_at_command_multiple_ok_responses(self, mock_serial) -> None: - # mock_serial.return_value.read.side_effect = [b"OK\r\n", b"OK\r\n", b""] - # result = send_at_command("COM1", "AT+TEST") - # self.assertEqual(result, "OK\nOK") - - # @patch("sierra_status.src.usb_handle.serial.Serial") - # def test_send_at_command_mixed_responses(self, mock_serial) -> None: - # mock_serial.return_value.read.side_effect = [b"Some data\r\n", b"OK\r\n", b""] - # result = send_at_command("COM1", "AT+TEST") - # self.assertEqual(result, "") - - class TestGetModuleStatusAdvanced(unittest.TestCase): @patch("sierra_status.src.usb_handle.send_at_command") @@ -278,7 +251,6 @@ def test_get_module_status_with_search_exception( self.assertIn("Test Result", result) self.assertNotIn("COPS Error", result) - class TestGetEmCopsAdvanced(unittest.TestCase): @patch("sierra_status.src.usb_handle.send_at_command") @@ -294,7 +266,6 @@ def test_get_em_cops_custom_baudrate(self, mock_send_at_command) -> None: mock_send_at_command.assert_called_with("COM1", AT_COMMAND_COPS, 120, 9600) self.assertEqual(result, "Custom Baudrate Result") - class TestCreatStatusFileAdvanced(unittest.TestCase): @patch("builtins.open", new_callable=mock_open) @@ -306,57 +277,5 @@ def test_creat_status_file_unicode_content(self, mock_strftime, mock_file) -> No mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") mock_file().write.assert_called_with(unicode_content) - @patch("builtins.open", new_callable=mock_open) - @patch("sierra_status.src.usb_handle.time.strftime") - def test_creat_status_file_empty_content(self, mock_strftime, mock_file) -> None: - mock_strftime.return_value = "20230101_120000" - creat_status_file("", "TestModel") - mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") - mock_file().write.assert_called_with("") - - -class TestStartProcessAdvanced(unittest.TestCase): - - @patch("sierra_status.src.usb_handle.get_module_status") - @patch("sierra_status.src.usb_handle.creat_status_file") - @patch("sierra_status.src.usb_handle.logging.basicConfig") - def test_start_process_custom_baudrate( - self, mock_basicConfig, mock_creat_status_file, mock_get_module_status - ) -> None: - mock_get_module_status.return_value = "Custom Baudrate Status" - start_process("COM1", "TestModel", logging.INFO, 0, baudrate=9600) - mock_get_module_status.assert_called_with("COM1", 0, "TestModel", 9600) - mock_creat_status_file.assert_called_with("Custom Baudrate Status", "TestModel") - - @patch("sierra_status.src.usb_handle.get_module_status") - @patch("sierra_status.src.usb_handle.creat_status_file") - @patch("sierra_status.src.usb_handle.logging.error") - def test_start_process_empty_result_with_search( - self, mock_logging_error, mock_creat_status_file, mock_get_module_status - ) -> None: - mock_get_module_status.return_value = "" - start_process("COM1", "TestModel", logging.INFO, 1) - mock_get_module_status.assert_called_with( - "COM1", 1, "TestModel", DEFAULT_BAUDRATE - ) - mock_logging_error.assert_called_with("No result received from the module.") - mock_creat_status_file.assert_not_called() - - -class TestAnimateSpinnerAdvanced(unittest.TestCase): - - @patch("sys.stdout") - @patch("time.sleep") - def test_animate_spinner_interruption(self, mock_sleep, mock_stdout) -> None: - mock_sleep.side_effect = KeyboardInterrupt() - with self.assertRaises(KeyboardInterrupt): - animate_spinner() - self.assertEqual(mock_stdout.write.call_count, 1) - self.assertEqual(mock_stdout.flush.call_count, 1) - - -@patch("sierra_status.src.usb_handle.serial.Serial") -def test_send_at_command_invalid_port(self, mock_serial) -> None: - mock_serial.side_effect = serial.SerialException("Invalid port") - result = send_at_command("INVALID_PORT", "AT+TEST") - self.assertEqual(result, "") +if __name__ == "__main__": + unittest.main() From b6e0625e16717bbdfbd5e6baaad463cb15a1e59e Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 14:39:58 +0300 Subject: [PATCH 03/10] feat(ci): add Black code formatter and enable linting - Add Black to the pip install command alongside flake8 - Uncomment and update the linting step to use Black for code style checking - Replace flake8 linting with Black's --check option This change enhances the CI pipeline by introducing code formatting checks, ensuring consistent code style across the project. --- .github/workflows/test.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3a4b791..d15c003 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -30,10 +30,10 @@ jobs: run: | python -m pip install --upgrade pip pip install -r requirements.txt - pip install flake8 + pip install flake8 black - # - name: Run linting - # run: flake8 . + - name: Run linting + run: black --check . - name: Run tests with unittest id: run-tests From 481528c22d1be23ed3db7b26cecc6641f3472c08 Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 15:15:12 +0300 Subject: [PATCH 04/10] style(formatting): apply black code formatter across project - Apply consistent formatting to setup.py, cli.py, usb_handle.py, and test_usb_handle.py - Improve code readability by breaking long lines and adjusting indentation - Remove unnecessary blank lines and add spacing where needed - Standardize string quotes (use double quotes consistently) - Adjust import order in some files This commit improves code consistency and adheres to PEP 8 style guidelines, making the codebase more maintainable and easier to read. --- setup.py | 2 +- sierra_status/__version__.py | 2 +- sierra_status/src/cli.py | 55 ++++++++++++++++++----- sierra_status/src/usb_handle.py | 35 +++++++++------ tests/test_usb_handle.py | 79 ++++++++++++++++----------------- 5 files changed, 108 insertions(+), 65 deletions(-) diff --git a/setup.py b/setup.py index 8a82854..8ca2e41 100644 --- a/setup.py +++ b/setup.py @@ -3,6 +3,7 @@ This setup configuration includes the package metadata, such as the name, version, description, author, license, and URLs for the project. It also specifies the required Python version, the packages to be included, and the console script entry point. """ + from sierra_status.__version__ import __version__ from setuptools import setup, find_packages import os @@ -59,4 +60,3 @@ }, python_requires=">=3.8", # Requires Python 3.8 and above ) - diff --git a/sierra_status/__version__.py b/sierra_status/__version__.py index 51e0a06..bbab024 100644 --- a/sierra_status/__version__.py +++ b/sierra_status/__version__.py @@ -1 +1 @@ -__version__ = "0.1.4" \ No newline at end of file +__version__ = "0.1.4" diff --git a/sierra_status/src/cli.py b/sierra_status/src/cli.py index 60b59c2..88f8adf 100644 --- a/sierra_status/src/cli.py +++ b/sierra_status/src/cli.py @@ -8,6 +8,7 @@ DEFAULT_BAUDRATE = 115200 + def setup_logging(verbose: bool) -> None: """ Sets up the logging configuration based on the provided verbosity level. @@ -21,6 +22,7 @@ def setup_logging(verbose: bool) -> None: log_level = logging.DEBUG if verbose else logging.INFO logging.basicConfig(level=log_level) + def validate_port(port: str) -> None: """ Validates that the specified USB port exists on the system. @@ -34,6 +36,7 @@ def validate_port(port: str) -> None: if not os.path.exists(port): raise ValueError(f"The specified port '{port}' does not exist.") + def main() -> None: """ The main entry point for the Sierra Wireless EM9xxx/EM7xxx CLI tool. @@ -53,18 +56,43 @@ def main() -> None: parser = argparse.ArgumentParser( description="CLI tool for Sierra Wireless EM9xxx/EM7xxx modules to query status", - formatter_class=argparse.RawTextHelpFormatter + formatter_class=argparse.RawTextHelpFormatter, ) - required = parser.add_argument_group('required arguments') - required.add_argument("-p", "--port", help="USB port to use (e.g., 'COM1' for Windows or '/dev/ttyUSB2' for Linux)", required=True) + required = parser.add_argument_group("required arguments") + required.add_argument( + "-p", + "--port", + help="USB port to use (e.g., 'COM1' for Windows or '/dev/ttyUSB2' for Linux)", + required=True, + ) - optional = parser.add_argument_group('optional arguments') - optional.add_argument("--version", help="Show version", action="version", version=f"%(prog)s {__version__}") - optional.add_argument("-m", "--model", help="Model of the device to add to filename (e.g., EM9191 or EM7455)", default="") - optional.add_argument("-v", "--verbose", help="Enable verbose output", action="store_true") - optional.add_argument("-s", "--search", help="Search for network using AT!COPS=?", action="store_true") - optional.add_argument("-b", "--baudrate", help=f"Baudrate to use for serial communication (default: {DEFAULT_BAUDRATE})", default=DEFAULT_BAUDRATE, type=int) + optional = parser.add_argument_group("optional arguments") + optional.add_argument( + "--version", + help="Show version", + action="version", + version=f"%(prog)s {__version__}", + ) + optional.add_argument( + "-m", + "--model", + help="Model of the device to add to filename (e.g., EM9191 or EM7455)", + default="", + ) + optional.add_argument( + "-v", "--verbose", help="Enable verbose output", action="store_true" + ) + optional.add_argument( + "-s", "--search", help="Search for network using AT!COPS=?", action="store_true" + ) + optional.add_argument( + "-b", + "--baudrate", + help=f"Baudrate to use for serial communication (default: {DEFAULT_BAUDRATE})", + default=DEFAULT_BAUDRATE, + type=int, + ) args = parser.parse_args() @@ -72,10 +100,17 @@ def main() -> None: try: validate_port(args.port) - usb_handle.start_process(args.port, args.model.lower(), logging.getLogger().level, args.search, args.baudrate) + usb_handle.start_process( + args.port, + args.model.lower(), + logging.getLogger().level, + args.search, + args.baudrate, + ) except Exception as e: logging.error(f"An error occurred: {str(e)}") sys.exit(1) + if __name__ == "__main__": main() diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index 7802fe2..98326bf 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -17,9 +17,9 @@ def animate_spinner() -> None: """ Animates a simple spinner character to indicate an ongoing operation. """ - chars = '|/-\\' + chars = "|/-\\" for char in chars: - sys.stdout.write(f'\rReading {char}') + sys.stdout.write(f"\rReading {char}") sys.stdout.flush() time.sleep(0.05) @@ -59,7 +59,7 @@ def send_at_command( if "OK\r\n" in result or "ERROR\r\n" in result: break animate_spinner() - + except serial.SerialException as e: logging.error(f"Serial communication error: {e}") except ValueError as e: @@ -67,28 +67,33 @@ def send_at_command( except Exception as e: logging.error(f"Unexpected error: {e}") finally: - sys.stdout.write('\r' + ' ' * 20 + '\r') # Clear the spinner line + sys.stdout.write("\r" + " " * 20 + "\r") # Clear the spinner line sys.stdout.flush() return "\n".join(line.strip() for line in result.splitlines() if line.strip()) -def get_module_status(port: str, search: int, model: str, baudrate: int = 115200) -> str: +def get_module_status( + port: str, search: int, model: str, baudrate: int = 115200 +) -> str: """ Retrieves the status of an module using AT commands. - + Args: port (str): The serial port to use. search (int): A flag indicating whether to retrieve additional status information using the AT+COPS command. model (str): The model of the module. baudrate (int, optional): The baud rate to use for the serial connection. Defaults to 115200. - + Returns: str: The status information retrieved from the module. """ result = "" try: commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS - result = "\n\n".join(send_at_command(port, command, baudrate=baudrate).strip() for command in commands) + result = "\n\n".join( + send_at_command(port, command, baudrate=baudrate).strip() + for command in commands + ) if search: result += f"\n\n{get_em_cops(port)}" except Exception as e: @@ -99,11 +104,11 @@ def get_module_status(port: str, search: int, model: str, baudrate: int = 115200 def get_em_cops(port: str, baudrate: int = DEFAULT_BAUDRATE) -> str: """ Retrieves the status of an EM9xxx module using the AT+COPS command. - + Args: port (str): The serial port to use. baudrate (int, optional): The baud rate to use for the serial connection. Defaults to the DEFAULT_BAUDRATE. - + Returns: str: The status information retrieved from the module. """ @@ -137,9 +142,13 @@ def start_process( """ Main function to retrieve the status of an EM9xxx module using AT commands. """ - logging.basicConfig(level=log_level, format='%(asctime)s - %(levelname)s - %(message)s') - logging.info(f"Starting process for port {port} with model {model} and baudrate {baudrate}") - result = get_module_status(port, search, model, baudrate) + logging.basicConfig( + level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" + ) + logging.info( + f"Starting process for port {port} with model {model} and baudrate {baudrate}" + ) + result = get_module_status(port, search, model, baudrate) if result: creat_status_file(result, model) else: diff --git a/tests/test_usb_handle.py b/tests/test_usb_handle.py index 590df3d..00f6b52 100644 --- a/tests/test_usb_handle.py +++ b/tests/test_usb_handle.py @@ -18,12 +18,11 @@ start_process, ) -# Test data EXPECTED_COMMANDS = ["ATI", "AT+CMEE=1", "AT!GSTATUS?", "AT+CPIN?"] ENTER_CND_COMMAND = 'AT!ENTERCND="A710"' -class TestATCommands(unittest.TestCase): +class TestATCommands(unittest.TestCase): def test_at_commands_properties(self) -> None: for command_list in [AT_COMMANDS, AT_COMMANDS_HL78]: with self.subTest(command_list=command_list): @@ -40,8 +39,8 @@ def test_specific_commands_present(self) -> None: def test_enter_cnd_command_format(self) -> None: self.assertIn(ENTER_CND_COMMAND, AT_COMMANDS) -class TestUSBHandle(unittest.TestCase): +class TestUSBHandle(unittest.TestCase): def setUp(self) -> None: self.mock_port = "COM1" self.mock_command = "AT+TEST" @@ -62,7 +61,7 @@ def test_send_at_command_exception(self) -> None: result = send_at_command(self.mock_port, self.mock_command) self.assertEqual(result, "") - @patch('sierra_status.src.usb_handle.send_at_command') + @patch("sierra_status.src.usb_handle.send_at_command") def test_get_module_status_without_search( self, mock_send_at_command: MagicMock ) -> None: @@ -70,8 +69,8 @@ def test_get_module_status_without_search( result = get_module_status(self.mock_port, 0, "EM9xxx") self.assertIn("Test Result", result) - @patch('sierra_status.src.usb_handle.send_at_command') - @patch('sierra_status.src.usb_handle.get_em_cops') + @patch("sierra_status.src.usb_handle.send_at_command") + @patch("sierra_status.src.usb_handle.get_em_cops") def test_get_module_status_with_search( self, mock_get_em_cops: MagicMock, mock_send_at_command: MagicMock ) -> None: @@ -81,22 +80,22 @@ def test_get_module_status_with_search( self.assertIn("Test Result", result) self.assertIn("COPS Result", result) - @patch('sierra_status.src.usb_handle.send_at_command') + @patch("sierra_status.src.usb_handle.send_at_command") def test_get_em_cops(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "COPS Test Result" result = get_em_cops(self.mock_port) self.assertEqual(result, "COPS Test Result") - @patch('builtins.open', new_callable=mock_open) - @patch('sierra_status.src.usb_handle.time.strftime') + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") def test_creat_status_file(self, mock_strftime, mock_file) -> None: mock_strftime.return_value = "20230101_120000" creat_status_file("Test Status", "TestModel") mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") mock_file().write.assert_called_with("Test Status") - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") def test_start_process_with_result( self, mock_creat_status_file, mock_get_module_status ) -> None: @@ -104,8 +103,8 @@ def test_start_process_with_result( start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_called_with("Test Status", "TestModel") - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") def test_start_process_without_result( self, mock_creat_status_file, mock_get_module_status ) -> None: @@ -113,8 +112,8 @@ def test_start_process_without_result( start_process(self.mock_port, "TestModel", logging.INFO, 0) mock_creat_status_file.assert_not_called() -class TestAnimateSpinner(unittest.TestCase): +class TestAnimateSpinner(unittest.TestCase): def test_animate_spinner(self) -> None: with patch("sys.stdout") as mock_stdout, patch("time.sleep") as mock_sleep: animate_spinner() @@ -133,57 +132,56 @@ def test_animate_spinner_interruption(self) -> None: class TestSendATCommand(unittest.TestCase): - - @patch('sierra_status.src.usb_handle.serial.Serial') - @patch('sierra_status.src.usb_handle.time.time') + @patch("sierra_status.src.usb_handle.serial.Serial") + @patch("sierra_status.src.usb_handle.time.time") def test_send_at_command_timeout(self, mock_time, mock_serial) -> None: mock_time.side_effect = [0, 61] # Simulate timeout - mock_serial.return_value.read.return_value = b'' + mock_serial.return_value.read.return_value = b"" result = send_at_command("COM1", "AT+TEST", timeout=60) self.assertEqual(result, "") - @patch('sierra_status.src.usb_handle.serial.Serial') + @patch("sierra_status.src.usb_handle.serial.Serial") def test_send_at_command_error_response(self, mock_serial) -> None: - mock_serial.return_value.read_until.return_value = b'ERROR\r\n' + mock_serial.return_value.read_until.return_value = b"ERROR\r\n" result = send_at_command("COM1", "AT+TEST") self.assertEqual(result, "") -class TestGetModuleStatus(unittest.TestCase): - @patch('sierra_status.src.usb_handle.send_at_command') +class TestGetModuleStatus(unittest.TestCase): + @patch("sierra_status.src.usb_handle.send_at_command") def test_get_module_status_exception(self, mock_send_at_command) -> None: mock_send_at_command.side_effect = Exception("Test exception") result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result, "") - @patch('sierra_status.src.usb_handle.send_at_command') + @patch("sierra_status.src.usb_handle.send_at_command") def test_get_module_status_all_commands(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "OK" result = get_module_status("COM1", 0, "EM9xxx") self.assertEqual(result.count("OK"), len(AT_COMMANDS)) - @patch('sierra_status.src.usb_handle.send_at_command') + @patch("sierra_status.src.usb_handle.send_at_command") def test_get_module_status_hl78xx(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "OK" result = get_module_status("COM1", 0, "HL78xx") self.assertEqual(result.count("OK"), len(AT_COMMANDS_HL78)) -class TestCreatStatusFile(unittest.TestCase): - @patch('builtins.open', new_callable=mock_open) - @patch('sierra_status.src.usb_handle.time.strftime') +class TestCreatStatusFile(unittest.TestCase): + @patch("builtins.open", new_callable=mock_open) + @patch("sierra_status.src.usb_handle.time.strftime") def test_creat_status_file_exception(self, mock_strftime, mock_file) -> None: mock_strftime.return_value = "20230101_120000" mock_file.side_effect = Exception("Test exception") - with self.assertLogs(level='ERROR') as log: + with self.assertLogs(level="ERROR") as log: creat_status_file("Test Status", "TestModel") self.assertIn("Error creating status file", log.output[0]) -class TestStartProcess(unittest.TestCase): - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') - @patch('sierra_status.src.usb_handle.logging.basicConfig') +class TestStartProcess(unittest.TestCase): + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.logging.basicConfig") def test_start_process_log_level( self, mock_basicConfig, mock_creat_status_file, mock_get_module_status ) -> None: @@ -193,9 +191,9 @@ def test_start_process_log_level( level=logging.DEBUG, format="%(asctime)s - %(levelname)s - %(message)s" ) - @patch('sierra_status.src.usb_handle.get_module_status') - @patch('sierra_status.src.usb_handle.creat_status_file') - @patch('sierra_status.src.usb_handle.logging.error') + @patch("sierra_status.src.usb_handle.get_module_status") + @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.logging.error") def test_start_process_no_result( self, mock_logging_error, mock_creat_status_file, mock_get_module_status ) -> None: @@ -204,8 +202,8 @@ def test_start_process_no_result( mock_logging_error.assert_called_with("No result received from the module.") mock_creat_status_file.assert_not_called() -class TestSendATCommandAdvanced(unittest.TestCase): +class TestSendATCommandAdvanced(unittest.TestCase): @patch("sierra_status.src.usb_handle.serial.Serial") def test_send_at_command_invalid_port(self, mock_serial) -> None: mock_serial.side_effect = serial.SerialException("Invalid port") @@ -224,8 +222,8 @@ def test_send_at_command_invalid_baudrate(self) -> None: with self.assertRaises(ValueError): send_at_command("COM1", "AT+TEST", baudrate=0) -class TestGetModuleStatusAdvanced(unittest.TestCase): +class TestGetModuleStatusAdvanced(unittest.TestCase): @patch("sierra_status.src.usb_handle.send_at_command") def test_get_module_status_hl78xx_model(self, mock_send_at_command) -> None: mock_send_at_command.return_value = "HL78xx Response" @@ -244,15 +242,15 @@ def test_get_module_status_unknown_model(self, mock_send_at_command) -> None: @patch("sierra_status.src.usb_handle.get_em_cops") def test_get_module_status_with_search_exception( self, mock_get_em_cops, mock_send_at_command - ): + ) -> None: mock_send_at_command.return_value = "Test Result" mock_get_em_cops.side_effect = Exception("COPS Error") result = get_module_status("COM1", 1, "EM9xxx") self.assertIn("Test Result", result) self.assertNotIn("COPS Error", result) -class TestGetEmCopsAdvanced(unittest.TestCase): +class TestGetEmCopsAdvanced(unittest.TestCase): @patch("sierra_status.src.usb_handle.send_at_command") def test_get_em_cops_timeout(self, mock_send_at_command) -> None: mock_send_at_command.side_effect = TimeoutError("Command timed out") @@ -266,8 +264,8 @@ def test_get_em_cops_custom_baudrate(self, mock_send_at_command) -> None: mock_send_at_command.assert_called_with("COM1", AT_COMMAND_COPS, 120, 9600) self.assertEqual(result, "Custom Baudrate Result") -class TestCreatStatusFileAdvanced(unittest.TestCase): +class TestCreatStatusFileAdvanced(unittest.TestCase): @patch("builtins.open", new_callable=mock_open) @patch("sierra_status.src.usb_handle.time.strftime") def test_creat_status_file_unicode_content(self, mock_strftime, mock_file) -> None: @@ -277,5 +275,6 @@ def test_creat_status_file_unicode_content(self, mock_strftime, mock_file) -> No mock_file.assert_called_with("status_TestModel_20230101_120000.txt", "w") mock_file().write.assert_called_with(unicode_content) + if __name__ == "__main__": unittest.main() From 794f5379a5145ca4030c62c2bc1309902058950b Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 16:49:17 +0300 Subject: [PATCH 05/10] feat(usb_handle): enhance status logging and timing information - Add start and finish timestamps to status output - Include total script execution time in log - Import 'log' function from math module (unused in this change) This commit improves the diagnostic information provided by the status retrieval process, making it easier to track when operations started and completed, as well as the overall runtime of the script. --- sierra_status/src/usb_handle.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index 98326bf..b553de2 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -1,3 +1,4 @@ +from math import log import sys import time import serial @@ -87,7 +88,7 @@ def get_module_status( Returns: str: The status information retrieved from the module. """ - result = "" + result = f"Starting time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())}\n" try: commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS result = "\n\n".join( @@ -128,6 +129,10 @@ def creat_status_file(result: str, model: str) -> None: """ try: time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) + result = ( + f"Finished time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())}\n" + + result + ) file_name = STATUS_FILE_PATTERN.format(model=model, timestamp=time_stamp) with open(file_name, "w") as f: f.write(result) @@ -142,6 +147,8 @@ def start_process( """ Main function to retrieve the status of an EM9xxx module using AT commands. """ + # add a total time counter for run this script + start_time = time.time() logging.basicConfig( level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" ) @@ -153,3 +160,6 @@ def start_process( creat_status_file(result, model) else: logging.error("No result received from the module.") + logging.info( + f"Total time for running this script: {time.time() - start_time:.2f} seconds" + ) From f56574aaa90e8cebd529e607a5cd67c624dce929 Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 16:49:43 +0300 Subject: [PATCH 06/10] feat(usb_handle): enhance status logging and timing information - Add start and finish timestamps to status output - Include total script execution time in log - Import 'log' function from math module (unused in this change) This commit improves the diagnostic information provided by the status retrieval process, making it easier to track when operations started and completed, as well as the overall runtime of the script. --- sierra_status/src/usb_handle.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index b553de2..f2f4284 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -1,4 +1,3 @@ -from math import log import sys import time import serial From 3278700e3d6457f1ab3e1cb857579a0d1a2caca4 Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 17:15:48 +0300 Subject: [PATCH 07/10] refactor(usb_handle): improve logging and documentation - Remove timestamp from initial result string in get_module_status() - Add detailed docstring for creat_status_file() function - Enhance start_process() function with improved documentation and logging - Move start time logging from result string to logging.info() call This commit improves code readability and maintainability by enhancing documentation and refining the logging approach. --- sierra_status/src/usb_handle.py | 24 +++++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index f2f4284..ac914a6 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -87,7 +87,7 @@ def get_module_status( Returns: str: The status information retrieved from the module. """ - result = f"Starting time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())}\n" + result = "" try: commands = AT_COMMANDS_HL78 if model.lower() == "hl78xx" else AT_COMMANDS result = "\n\n".join( @@ -125,6 +125,13 @@ def get_em_cops(port: str, baudrate: int = DEFAULT_BAUDRATE) -> str: def creat_status_file(result: str, model: str) -> None: """ Creates a status file with the provided result. + + Args: + result (str): The status information to be written to the file. + model (str): The model of the module. + + Returns: + None """ try: time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) @@ -145,14 +152,25 @@ def start_process( ) -> None: """ Main function to retrieve the status of an EM9xxx module using AT commands. + + Args: + port (str): The serial port to use. + model (str): The model of the module. + log_level (int): The logging level to use. + search (int): The search parameter to use. + baudrate (int, optional): The baud rate to use for the serial connection. Defaults to the DEFAULT_BAUDRATE. + + Returns: + None """ - # add a total time counter for run this script start_time = time.time() logging.basicConfig( level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" ) logging.info( - f"Starting process for port {port} with model {model} and baudrate {baudrate}" + f"""Start time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())} + Starting process for port {port} + with model {model} and baudrate {baudrate}""" ) result = get_module_status(port, search, model, baudrate) if result: From 3ae6b5197db2160a138f31a77a8549f8a454a3d4 Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 17:23:43 +0300 Subject: [PATCH 08/10] refactor(usb_handle): simplify timestamp formatting in creat_status_file function - Replace separate time.strftime() call with reuse of existing time_stamp variable - Improve code readability and reduce redundancy in result string formatting - Ensure consistent timestamp format throughout the status file creation process --- sierra_status/src/usb_handle.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index ac914a6..5234032 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -135,10 +135,7 @@ def creat_status_file(result: str, model: str) -> None: """ try: time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) - result = ( - f"Finished time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())}\n" - + result - ) + result = f"Finished time: {time_stamp}\n" + result file_name = STATUS_FILE_PATTERN.format(model=model, timestamp=time_stamp) with open(file_name, "w") as f: f.write(result) From d50f7a7fba94eb043ef2fb7f1e00037b4f8ef48f Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 17:27:21 +0300 Subject: [PATCH 09/10] update linting issue --- sierra_status/src/usb_handle.py | 1 - 1 file changed, 1 deletion(-) diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index 77cb2df..be60aaf 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -170,7 +170,6 @@ def start_process( f"""Start time: {time.strftime('%Y-%m-%d_%H:%M:%S', time.localtime())} Starting process for port {port} with model {model} and baudrate {baudrate}""" - ) result = get_module_status(port, search, model, baudrate) if result: From 0181074b0669091d791605c7008f304c4fdbdbe7 Mon Sep 17 00:00:00 2001 From: elkanamol Date: Sun, 8 Sep 2024 17:50:45 +0300 Subject: [PATCH 10/10] refactor(usb_handle): improve timestamp handling and code organization - Move timestamp generation to start_process function - Add timestamp to result string before creating status file - Remove unnecessary blank lines - Update test case to reflect new timestamp handling --- sierra_status/src/usb_handle.py | 6 ++---- tests/test_usb_handle.py | 7 +++++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/sierra_status/src/usb_handle.py b/sierra_status/src/usb_handle.py index be60aaf..cd93cb1 100644 --- a/sierra_status/src/usb_handle.py +++ b/sierra_status/src/usb_handle.py @@ -59,7 +59,6 @@ def send_at_command( if "OK\r\n" in result or "ERROR\r\n" in result: break animate_spinner() - except serial.SerialException as e: logging.error(f"Serial communication error: {e}") except ValueError as e: @@ -135,8 +134,6 @@ def creat_status_file(result: str, model: str) -> None: """ try: time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) - result = f"Finished time: {time_stamp}\n" + result - file_name = STATUS_FILE_PATTERN.format(model=model, timestamp=time_stamp) with open(file_name, "w") as f: f.write(result) @@ -162,7 +159,6 @@ def start_process( None """ start_time = time.time() - logging.basicConfig( level=log_level, format="%(asctime)s - %(levelname)s - %(message)s" ) @@ -173,6 +169,8 @@ def start_process( ) result = get_module_status(port, search, model, baudrate) if result: + time_stamp = time.strftime("%Y%m%d_%H%M%S", time.localtime()) + result = f"Finished time: {time_stamp}\n" + result creat_status_file(result, model) else: logging.error("No result received from the module.") diff --git a/tests/test_usb_handle.py b/tests/test_usb_handle.py index 00f6b52..0066985 100644 --- a/tests/test_usb_handle.py +++ b/tests/test_usb_handle.py @@ -96,12 +96,15 @@ def test_creat_status_file(self, mock_strftime, mock_file) -> None: @patch("sierra_status.src.usb_handle.get_module_status") @patch("sierra_status.src.usb_handle.creat_status_file") + @patch("sierra_status.src.usb_handle.time.strftime") def test_start_process_with_result( - self, mock_creat_status_file, mock_get_module_status + self, mock_strftime, mock_creat_status_file, mock_get_module_status ) -> None: mock_get_module_status.return_value = "Test Status" + mock_strftime.return_value = "20230101_120000" start_process(self.mock_port, "TestModel", logging.INFO, 0) - mock_creat_status_file.assert_called_with("Test Status", "TestModel") + expected_result = "Finished time: 20230101_120000\nTest Status" + mock_creat_status_file.assert_called_with(expected_result, "TestModel") @patch("sierra_status.src.usb_handle.get_module_status") @patch("sierra_status.src.usb_handle.creat_status_file")