From 716cbe89f988b4a768284518b41a3eec9b14557d Mon Sep 17 00:00:00 2001 From: YSevenK Date: Tue, 6 Aug 2024 17:52:29 +0800 Subject: [PATCH] fix bugs.add unittest workflow.reformat --- .../test_command_scene_configHelper.yml | 31 ++++ test/common/test_command.py | 95 +++++------ test/common/test_config_helper.py | 44 +++-- test/common/test_scene.py | 153 +++++++++++++----- 4 files changed, 205 insertions(+), 118 deletions(-) create mode 100644 .github/workflows/test_command_scene_configHelper.yml diff --git a/.github/workflows/test_command_scene_configHelper.yml b/.github/workflows/test_command_scene_configHelper.yml new file mode 100644 index 00000000..e5647e0a --- /dev/null +++ b/.github/workflows/test_command_scene_configHelper.yml @@ -0,0 +1,31 @@ +# common包下command、scene、config_helper的测试用例 +name: Test command_scene_configHelper + +on: + push: + branches: "*" + pull_request: + branches: "*" + +jobs: + build: + + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v3 + with: + fetch-depth: 0 # Fetch all history for proper version detection + + - name: Set up Python 3.8 + uses: actions/setup-python@v3 + with: + python-version: 3.8 + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -r requirements3.txt + + - name: Run tests + run: python -m unittest discover -s test/common -p 'test_*.py' diff --git a/test/common/test_command.py b/test/common/test_command.py index dce787ae..ac78f06e 100644 --- a/test/common/test_command.py +++ b/test/common/test_command.py @@ -18,7 +18,7 @@ import unittest from unittest.mock import Mock, patch import subprocess -from common.command import * +from common.command import * class TestLocalClient(unittest.TestCase): @@ -36,11 +36,11 @@ def test_run_success(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run(cmd) - + # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证结果 self.assertEqual(result, b'success') @@ -53,11 +53,11 @@ def test_run_failure(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run(cmd) - + # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证错误处理 self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost, stderr=[b'error']") self.assertEqual(result, b'') @@ -69,13 +69,12 @@ def test_run_exception(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run(cmd) - + # 验证 verbose 调用和异常处理 self.stdio.verbose.assert_called_with("[local host] run cmd = [echo \"hello\"] on localhost") self.stdio.error.assert_called_with("run cmd = [echo \"hello\"] on localhost") self.assertIsNone(result) - - + @patch('subprocess.Popen') def test_run_get_stderr_success(self, mock_popen): # 模拟命令成功执行 @@ -85,15 +84,14 @@ def test_run_get_stderr_success(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run_get_stderr(cmd) - + # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证结果 self.assertEqual(result, b'') - @patch('subprocess.Popen') def test_run_get_stderr_failure(self, mock_popen): # 模拟命令执行失败 @@ -107,16 +105,15 @@ def test_run_get_stderr_failure(self, mock_popen): # 验证 verbose 和 Popen 调用 self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") mock_popen.assert_called_with(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True, executable='/bin/bash') - + # 验证错误处理 # 因为 stdout 和 stderr 都是 b'',stderr 应该是 b'error' self.assertEqual(result, b'error') - + # 检查 error 方法是否被调用,且调用内容是否正确 # 注意:在正常情况下 error 方法不应该被调用,只有异常情况才会被调用。 # 确保 error 方法在异常情况下被调用 self.stdio.error.assert_not_called() - @patch('subprocess.Popen') def test_run_get_stderr_exception(self, mock_popen): @@ -125,18 +122,18 @@ def test_run_get_stderr_exception(self, mock_popen): cmd = 'echo "hello"' result = self.local_client.run_get_stderr(cmd) - + # 验证 verbose 调用和异常处理 self.stdio.verbose.assert_called_with("run cmd = [echo \"hello\"] on localhost") self.stdio.error.assert_called_with(f"run cmd = [{cmd}] on localhost") self.assertIsNone(result) - + def test_download_file_success(self): remote_path = "/remote/path/file.txt" local_path = "/local/path/file.txt" - + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) - + self.ssh_client.download.assert_called_once_with(remote_path, local_path) self.assertEqual(result, local_path) self.stdio.error.assert_not_called() @@ -145,82 +142,78 @@ def test_download_file_success(self): def test_download_file_failure(self): remote_path = "/remote/path/file.txt" local_path = "/local/path/file.txt" - + self.ssh_client.download.side_effect = Exception("Simulated download exception") - + result = download_file(self.ssh_client, remote_path, local_path, self.stdio) - + self.ssh_client.download.assert_called_once_with(remote_path, local_path) self.assertEqual(result, local_path) self.stdio.error.assert_called_once_with("Download File Failed error: Simulated download exception") self.stdio.verbose.assert_called_once() - + def test_upload_file_success(self): local_path = "/local/path/file.txt" remote_path = "/remote/path/file.txt" self.ssh_client.get_name.return_value = "test_server" - + result = upload_file(self.ssh_client, local_path, remote_path, self.stdio) - + self.ssh_client.upload.assert_called_once_with(remote_path, local_path) - self.stdio.verbose.assert_called_once_with( - "Please wait a moment, upload file to server test_server, local file path /local/path/file.txt, remote file path /remote/path/file.txt" - ) - self.stdio.error.assert_not_called() - + self.stdio.verbose.assert_called_once_with("Please wait a moment, upload file to server test_server, local file path /local/path/file.txt, remote file path /remote/path/file.txt") + self.stdio.error.assert_not_called() + def test_rm_rf_file_success(self): dir_path = "/path/to/delete" - + rm_rf_file(self.ssh_client, dir_path, self.stdio) - - self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete") - + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete") + def test_rm_rf_file_empty_dir(self): dir_path = "" - + rm_rf_file(self.ssh_client, dir_path, self.stdio) - - self.ssh_client.exec_cmd.assert_called_once_with("rm -rf ") - + + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf ") + def test_rm_rf_file_special_chars(self): dir_path = "/path/to/delete; echo 'This is a test'" - + rm_rf_file(self.ssh_client, dir_path, self.stdio) - + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/delete; echo 'This is a test'") def test_delete_file_in_folder_success(self): file_path = "/path/to/gather_pack" - + delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/gather_pack/*") - def test_delete_file_in_folder_none_path(self): file_path = None - + with self.assertRaises(Exception) as context: delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.assertTrue("Please check file path, None" in str(context.exception)) - + def test_delete_file_in_folder_invalid_path(self): file_path = "/path/to/invalid_folder" - + with self.assertRaises(Exception) as context: delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.assertTrue("Please check file path, /path/to/invalid_folder" in str(context.exception)) - def test_delete_file_in_folder_special_chars(self): file_path = "/path/to/gather_pack; echo 'test'" - + delete_file_in_folder(self.ssh_client, file_path, self.stdio) - + self.ssh_client.exec_cmd.assert_called_once_with("rm -rf /path/to/gather_pack; echo 'test'/*") - + if __name__ == '__main__': unittest.main() diff --git a/test/common/test_config_helper.py b/test/common/test_config_helper.py index 747a7c58..0137dd73 100644 --- a/test/common/test_config_helper.py +++ b/test/common/test_config_helper.py @@ -19,6 +19,7 @@ from unittest import mock from common.config_helper import ConfigHelper + class TestConfigHelper(unittest.TestCase): @mock.patch('common.config_helper.YamlUtils.write_yaml_data') @mock.patch('common.config_helper.DirectoryUtil.mkdir') @@ -27,48 +28,39 @@ class TestConfigHelper(unittest.TestCase): def test_save_old_configuration(self, mock_timestamp_to_filename_time, mock_expanduser, mock_mkdir, mock_write_yaml_data): # 模拟时间戳生成函数,返回一个特定的值 mock_timestamp_to_filename_time.return_value = '20240806_123456' - + # 模拟路径扩展函数 def mock_expanduser_path(path): - return { - '~/.obdiag/config.yml': '/mock/config.yml', - '~/mock/backup/dir': '/mock/backup/dir' - }.get(path, path) # 默认返回原路径 - + return {'~/.obdiag/config.yml': '/mock/config.yml', '~/mock/backup/dir': '/mock/backup/dir'}.get(path, path) # 默认返回原路径 + mock_expanduser.side_effect = mock_expanduser_path - + # 模拟目录创建函数 mock_mkdir.return_value = None # 模拟YAML数据写入函数 mock_write_yaml_data.return_value = None - + # 创建一个模拟的上下文对象 context = mock.MagicMock() - context.inner_config = { - "obdiag": { - "basic": { - "config_backup_dir": "~/mock/backup/dir" - } - } - } - + context.inner_config = {"obdiag": {"basic": {"config_backup_dir": "~/mock/backup/dir"}}} + # 初始化ConfigHelper对象 config_helper = ConfigHelper(context) # 定义一个示例配置 sample_config = {'key': 'value'} - + # 调用需要测试的方法 config_helper.save_old_configuration(sample_config) - + # 验证路径扩展是否被正确调用 mock_expanduser.assert_any_call('~/.obdiag/config.yml') mock_expanduser.assert_any_call('~/mock/backup/dir') - + # 验证目录创建是否被正确调用 mock_mkdir.assert_called_once_with(path='/mock/backup/dir') - + # 验证YAML数据写入是否被正确调用 expected_backup_path = '/mock/backup/dir/config_backup_20240806_123456.yml' mock_write_yaml_data.assert_called_once_with(sample_config, expected_backup_path) @@ -99,7 +91,7 @@ def test_input_with_default(self, mock_input): mock_input.return_value = 'custom_user' result = config_helper.input_with_default('username', 'default_user') self.assertEqual(result, 'custom_user') - + # 测试带有默认值的密码输入方法 @mock.patch('common.config_helper.pwinput.pwinput') def test_input_password_with_default(self, mock_pwinput): @@ -111,7 +103,7 @@ def test_input_password_with_default(self, mock_pwinput): mock_pwinput.return_value = '' result = config_helper.input_password_with_default("password", "default_password") self.assertEqual(result, "default_password") - + # 测试密码输入为'y'的情况,应该返回默认值 mock_pwinput.return_value = 'y' result = config_helper.input_password_with_default("password", "default_password") @@ -125,7 +117,7 @@ def test_input_password_with_default(self, mock_pwinput): # 测试密码输入为其他值的情况,应该返回输入值 mock_pwinput.return_value = 'custom_password' result = config_helper.input_password_with_default("password", "default_password") - self.assertEqual(result, "custom_password") + self.assertEqual(result, "custom_password") # 测试带有默认选项的选择输入方法 @mock.patch('common.config_helper.input') @@ -138,7 +130,7 @@ def test_input_choice_default(self, mock_input): mock_input.return_value = 'y' result = config_helper.input_choice_default("choice", "N") self.assertTrue(result) - + # 测试输入为'yes'的情况,应该返回True mock_input.return_value = 'yes' result = config_helper.input_choice_default("choice", "N") @@ -157,8 +149,8 @@ def test_input_choice_default(self, mock_input): # 测试输入为空字符串的情况,应该返回False mock_input.return_value = '' result = config_helper.input_choice_default("choice", "N") - self.assertFalse(result) + self.assertFalse(result) if __name__ == '__main__': - unittest.main() \ No newline at end of file + unittest.main() diff --git a/test/common/test_scene.py b/test/common/test_scene.py index 135447df..21ad57d3 100644 --- a/test/common/test_scene.py +++ b/test/common/test_scene.py @@ -11,64 +11,135 @@ # See the Mulan PSL v2 for more details. """ -@time: 2024/01/16 -@file: scene.py -@desc: +@time: 2024/8/6 +@file: test_scene.py +@desc: 为scene模块中filter_by_version和get_version_by_type函数进行单元测试 """ - import unittest -from unittest.mock import MagicMock -from common.scene import filter_by_version +from unittest.mock import MagicMock, patch +from common.scene import * class TestFilterByVersion(unittest.TestCase): def setUp(self): self.stdio = MagicMock() - self.scene = [{"version": "[1.0.0,2.0.0)"}, {"version": "(1.0.0,2.0.0]"}] - self.cluster = {"version": "1.5.0"} + StringUtils.compare_versions_greater = MagicMock() + self.context = MagicMock() + self.context.stdio = MagicMock() - def test_filter_by_version_with_valid_version(self): - # Test case where cluster version is within the range specified in the scene - result = filter_by_version(self.scene, self.cluster, self.stdio) - self.assertEqual(result, 1) + def test_no_version_in_cluster(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {} + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) - def test_filter_by_version_with_invalid_version(self): - # Test case where cluster version is outside the range specified in the scene - self.cluster["version"] = "0.5.0" - result = filter_by_version(self.scene, self.cluster, self.stdio) + def test_empty_version_in_cluster(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": ""} + result = filter_by_version(scene, cluster, self.stdio) self.assertEqual(result, 0) - def test_filter_by_version_with_wildcard_min_version(self): - # Test case where min version is wildcard (*) and cluster version is valid - self.scene[0]["version"] = "[*,2.0.0)" - result = filter_by_version(self.scene, self.cluster, self.stdio) - self.assertEqual(result, 1) + def test_version_not_string(self): + scene = [{"version": 123}] + cluster = {"version": "1.5"} + with self.assertRaises(Exception): + filter_by_version(scene, cluster, self.stdio) - def test_filter_by_version_with_wildcard_max_version(self): - # Test case where max version is wildcard (*) and cluster version is valid - self.scene[1]["version"] = "(1.0.0,*]" - result = filter_by_version(self.scene, self.cluster, self.stdio) - self.assertEqual(result, 2) + def test_version_match_min(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": "1.0"} + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) - def test_filter_by_version_with_non_string_version(self): - # Test case where version is not a string - self.scene[0]["version"] = str(1.0) - with self.assertRaises(Exception) as context: - filter_by_version(self.scene, self.cluster, self.stdio) - self.assertTrue("filter_by_version steps_version Exception" in str(context.exception)) + def test_version_match_max(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": "2.0"} + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) + + def test_version_in_range(self): + scene = [{"version": "[1.0,2.0]"}] + cluster = {"version": "1.5"} + StringUtils.compare_versions_greater.side_effect = [True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) - def test_filter_by_version_no_version_in_cluster(self): - # Test case where version is not specified in the cluster - del self.cluster["version"] - result = filter_by_version(self.scene, self.cluster, self.stdio) + def test_version_out_of_range(self): + scene = [{"version": "[1.0,2.0]"}, {"version": "[2.0,3.0]"}] + cluster = {"version": "2.5"} + StringUtils.compare_versions_greater.side_effect = [False, True, True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 1) + + def test_no_version_in_steps(self): + scene = [{}] + cluster = {"version": "1.0"} + result = filter_by_version(scene, cluster, self.stdio) self.assertEqual(result, 0) - def test_filter_by_version_no_version_in_steps(self): - # Test case where no version is specified in any steps - self.scene = [{"some_key": "some_value"}] - result = filter_by_version(self.scene, self.cluster, self.stdio) + def test_no_matching_version(self): + scene = [{"version": "[1.0,2.0]"}, {"version": "[2.0,3.0]"}] + cluster = {"version": "3.5"} + StringUtils.compare_versions_greater.return_value = False + result = filter_by_version(scene, cluster, self.stdio) self.assertEqual(result, -1) + def test_wildcard_min_version(self): + scene = [{"version": "[*,2.0]"}] + cluster = {"version": "1.0"} + StringUtils.compare_versions_greater.side_effect = [True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) + + def test_wildcard_max_version(self): + scene = [{"version": "[1.0,*]"}] + cluster = {"version": "3.0"} + StringUtils.compare_versions_greater.side_effect = [True, True] + result = filter_by_version(scene, cluster, self.stdio) + self.assertEqual(result, 0) + + @patch('common.scene.get_observer_version') + def test_get_observer_version(self, mock_get_observer_version): + mock_get_observer_version.return_value = "1.0.0" + result = get_version_by_type(self.context, "observer") + self.assertEqual(result, "1.0.0") + mock_get_observer_version.assert_called_once_with(self.context) + + @patch('common.scene.get_observer_version') + def test_get_other_version(self, mock_get_observer_version): + mock_get_observer_version.return_value = "2.0.0" + result = get_version_by_type(self.context, "other") + self.assertEqual(result, "2.0.0") + mock_get_observer_version.assert_called_once_with(self.context) + + @patch('common.scene.get_observer_version') + def test_get_observer_version_fail(self, mock_get_observer_version): + mock_get_observer_version.side_effect = Exception("Observer error") + with self.assertRaises(Exception) as context: + get_version_by_type(self.context, "observer") + self.assertIn("can't get observer version", str(context.exception)) + self.context.stdio.warn.assert_called_once() + + @patch('common.scene.get_obproxy_version') + def test_get_obproxy_version(self, mock_get_obproxy_version): + mock_get_obproxy_version.return_value = "3.0.0" + result = get_version_by_type(self.context, "obproxy") + self.assertEqual(result, "3.0.0") + mock_get_obproxy_version.assert_called_once_with(self.context) + + def test_unsupported_type(self): + with self.assertRaises(Exception) as context: + get_version_by_type(self.context, "unsupported") + self.assertIn("No support to get the version", str(context.exception)) + + @patch('common.scene.get_observer_version') + def test_general_exception_handling(self, mock_get_observer_version): + mock_get_observer_version.side_effect = Exception("Unexpected error") + with self.assertRaises(Exception) as context: + get_version_by_type(self.context, "observer") + self.assertIn("can't get observer version", str(context.exception)) + self.context.stdio.exception.assert_called_once() + -if __name__ == "__main__": +if __name__ == '__main__': unittest.main()