From e0001673830acafd22d5ab07b12f4ae5db58f2b6 Mon Sep 17 00:00:00 2001 From: hj Date: Fri, 23 Aug 2024 04:59:29 +0530 Subject: [PATCH] Add django workers tests --- .../workers/code_upload_submission_worker.py | 4 +- .../test_code_upload_submission_worker.py | 1198 +++++++++++++++++ .../worker/test_remote_submission_worker.py | 424 ++++++ tests/unit/worker/test_statsd_utils.py | 61 + tests/unit/worker/test_submission_worker.py | 70 +- tests/unit/worker/test_worker_utils.py | 223 +++ 6 files changed, 1974 insertions(+), 6 deletions(-) create mode 100644 tests/unit/worker/test_code_upload_submission_worker.py create mode 100644 tests/unit/worker/test_remote_submission_worker.py create mode 100644 tests/unit/worker/test_statsd_utils.py create mode 100644 tests/unit/worker/test_worker_utils.py diff --git a/scripts/workers/code_upload_submission_worker.py b/scripts/workers/code_upload_submission_worker.py index 4013b4795f..0af706daca 100755 --- a/scripts/workers/code_upload_submission_worker.py +++ b/scripts/workers/code_upload_submission_worker.py @@ -10,8 +10,8 @@ # TODO: Add exception in all the commands from kubernetes.client.rest import ApiException -from statsd_utils import increment_and_push_metrics_to_statsd -from worker_utils import EvalAI_Interface +from .statsd_utils import increment_and_push_metrics_to_statsd +from .worker_utils import EvalAI_Interface class GracefulKiller: diff --git a/tests/unit/worker/test_code_upload_submission_worker.py b/tests/unit/worker/test_code_upload_submission_worker.py new file mode 100644 index 0000000000..669815307d --- /dev/null +++ b/tests/unit/worker/test_code_upload_submission_worker.py @@ -0,0 +1,1198 @@ +import yaml +from kubernetes import client +from kubernetes.client.rest import ApiException +import signal +import unittest +from unittest.mock import mock_open, patch, MagicMock +from scripts.workers.code_upload_submission_worker import ( + EVALAI_API_SERVER, + GracefulKiller, + create_config_map_object, + create_configmap, + create_job, + create_job_object, + create_script_config_map, + create_static_code_upload_submission_job_object, + delete_job, + get_api_client, + get_api_object, + get_config_map_volume_object, + get_core_v1_api_object, + get_empty_volume_object, + get_init_container, + get_job_constraints, + get_job_object, + get_pods_from_job, + get_submission_meta_update_curl, + get_volume_list, + get_volume_mount_list, + get_volume_mount_object, + install_gpu_drivers, + process_submission_callback, + update_failed_jobs_and_send_logs, + main, + read_job, +) + + +class TestGracefulKiller(unittest.TestCase): + + def test_exit_gracefully(self): + killer = GracefulKiller() + killer.exit_gracefully(signal.SIGINT, None) + self.assertTrue(killer.kill_now) + + +class TestGetVolumeMountObject(unittest.TestCase): + + def test_get_volume_mount_object(self): + mount_path = "/mnt/data" + name = "data-volume" + read_only = True + volume_mount = get_volume_mount_object(mount_path, name, read_only) + self.assertEqual(volume_mount.mount_path, mount_path) + self.assertEqual(volume_mount.name, name) + self.assertEqual(volume_mount.read_only, read_only) + + def test_get_volume_mount_list(self): + mount_path = "/mnt/data" + read_only = True + volume_mount_list = get_volume_mount_list(mount_path, read_only) + self.assertEqual(len(volume_mount_list), 1) + self.assertEqual(volume_mount_list[0].mount_path, mount_path) + self.assertEqual(volume_mount_list[0].name, "efs-claim") + self.assertEqual(volume_mount_list[0].read_only, read_only) + + def test_get_volume_list(self): + volume_list = get_volume_list() + self.assertEqual(len(volume_list), 1) + self.assertEqual(volume_list[0].name, "efs-claim") + self.assertEqual( + volume_list[0].persistent_volume_claim.claim_name, "efs-claim" + ) + + def test_get_empty_volume_object(self): + volume_name = "empty-volume" + volume = get_empty_volume_object(volume_name) + self.assertEqual(volume.name, volume_name) + self.assertIsInstance(volume.empty_dir, client.V1EmptyDirVolumeSource) + + def test_get_config_map_volume_object(self): + config_map_name = "config-map" + volume_name = "config-map-volume" + volume = get_config_map_volume_object(config_map_name, volume_name) + self.assertEqual(volume.name, volume_name) + self.assertIsInstance( + volume.config_map, client.V1ConfigMapVolumeSource + ) + self.assertEqual(volume.config_map.name, config_map_name) + + +class TestCreateConfigMapObject(unittest.TestCase): + @patch( + "kubernetes.client.V1ObjectMeta" + ) # Mock the kubernetes client V1ObjectMeta + @patch( + "kubernetes.client.V1ConfigMap" + ) # Mock the kubernetes client V1ConfigMap + @patch( + "builtins.open", new_callable=mock_open, read_data="file content" + ) # Mock open function + def test_create_config_map_object_success( + self, mock_open, MockV1ConfigMap, MockV1ObjectMeta + ): + # Define mock file paths + mock_file_paths = ["/path/to/file1.txt", "/path/to/file2.txt"] + + # Call the function + config_map = create_config_map_object( + "test-config-map", mock_file_paths + ) + + # Check if V1ObjectMeta was called with the right parameters + MockV1ObjectMeta.assert_called_once_with(name="test-config-map") + + # Check if V1ConfigMap was called with the right parameters + MockV1ConfigMap.assert_called_once_with( + api_version="v1", + kind="ConfigMap", + data={"file1.txt": "file content", "file2.txt": "file content"}, + metadata=MockV1ObjectMeta.return_value, + ) + + # Check if the correct ConfigMap object is returned + self.assertEqual(config_map, MockV1ConfigMap.return_value) + + @patch("kubernetes.client.V1ObjectMeta") + @patch("kubernetes.client.V1ConfigMap") + def test_create_config_map_object_empty_file_paths( + self, MockV1ConfigMap, MockV1ObjectMeta + ): + config_map = create_config_map_object("test-config-map", []) + + MockV1ObjectMeta.assert_called_once_with(name="test-config-map") + + MockV1ConfigMap.assert_called_once_with( + api_version="v1", + kind="ConfigMap", + data={}, + metadata=MockV1ObjectMeta.return_value, + ) + + self.assertEqual(config_map, MockV1ConfigMap.return_value) + + @patch("kubernetes.client.V1ObjectMeta") + @patch("kubernetes.client.V1ConfigMap") + @patch("builtins.open", side_effect=FileNotFoundError) + def test_create_config_map_object_file_not_found( + self, mock_open, MockV1ConfigMap, MockV1ObjectMeta + ): + with self.assertRaises(FileNotFoundError): + create_config_map_object( + "test-config-map", ["/path/to/nonexistent_file.txt"] + ) + + @patch("kubernetes.client.V1ObjectMeta") + @patch("kubernetes.client.V1ConfigMap") + @patch("builtins.open", new_callable=mock_open) + def test_create_config_map_object_file_read_error( + self, mock_open, MockV1ConfigMap, MockV1ObjectMeta + ): + mock_open.side_effect = IOError("Unable to read file") + with self.assertRaises(IOError): + create_config_map_object("test-config-map", ["/path/to/file.txt"]) + + +class TestCreateConfigMap(unittest.TestCase): + @patch("kubernetes.client.CoreV1Api") + def test_create_configmap_success(self, MockCoreV1Api): + # Mock the CoreV1Api instance + core_v1_api_instance = MockCoreV1Api.return_value + + # Define the mock config map + config_map = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-config-map"}, + "data": {"key1": "value1", "key2": "value2"}, + } + + # Call the function + create_configmap(core_v1_api_instance, config_map) + + # Check if the CoreV1Api method was called with the right parameters + core_v1_api_instance.create_namespaced_config_map.assert_called_once_with( + namespace="default", body=config_map + ) + + @patch("kubernetes.client.CoreV1Api") + def test_create_configmap_exception(self, MockCoreV1Api): + # Mock the CoreV1Api instance + core_v1_api_instance = MockCoreV1Api.return_value + + # Define the mock config map + config_map = { + "apiVersion": "v1", + "kind": "ConfigMap", + "metadata": {"name": "test-config-map"}, + "data": {"key1": "value1", "key2": "value2"}, + } + + # Mock the create_namespaced_config_map method to raise an exception + core_v1_api_instance.create_namespaced_config_map.side_effect = ( + Exception("Test exception") + ) + + # Call the function and check if the exception is logged + with patch( + "scripts.workers.code_upload_submission_worker.logger.exception" + ) as mock_logger: + try: + create_configmap(core_v1_api_instance, config_map) + except Exception as e: + mock_logger.assert_called_once_with( + f"Exception while creating configmap with error Test exception {e}" + ) + + # Ensure create_namespaced_config_map was called + core_v1_api_instance.create_namespaced_config_map.assert_called_once() + + +class TestConfigFunctions(unittest.TestCase): + @patch( + "scripts.workers.code_upload_submission_worker.create_config_map_object" + ) # Mock the create_config_map_object function + def test_create_script_config_map(self, MockCreateConfigMapObject): + # Mock the return value of create_config_map_object + mock_config_map = MagicMock() + MockCreateConfigMapObject.return_value = mock_config_map + + config_map_name = "test-config-map" + expected_file_paths = [ + "/code/scripts/workers/code_upload_worker_utils/make_submission.sh", + "/code/scripts/workers/code_upload_worker_utils/monitor_submission.sh", + ] + + # Call the function + result = create_script_config_map(config_map_name) + + # Assert create_config_map_object was called with the correct parameters + MockCreateConfigMapObject.assert_called_once_with( + config_map_name, expected_file_paths + ) + + # Assert the return value is as expected + self.assertEqual(result, mock_config_map) + + @patch( + "scripts.workers.code_upload_submission_worker.EVALAI_API_SERVER", + "http://example.com", + ) # Mock the EVALAI_API_SERVER + @patch( + "scripts.workers.code_upload_submission_worker.AUTH_TOKEN", + "test-auth-token", + ) # Mock the AUTH_TOKEN + def test_get_submission_meta_update_curl(self): + submission_pk = 123 + expected_url = ( + "http://example.com/api/jobs/submission/123/update_started_at/" + ) + expected_curl_request = "curl --location --request PATCH '{}' --header 'Authorization: Bearer {}'".format( + expected_url, "test-auth-token" + ) + + # Call the function + result = get_submission_meta_update_curl(submission_pk) + + # Assert the result matches the expected curl command + self.assertEqual(result, expected_curl_request) + + @patch( + "scripts.workers.code_upload_submission_worker.client.V1Job" + ) # Replace 'scripts.workers.code_upload_submission_worker' with the actual module name + @patch( + "scripts.workers.code_upload_submission_worker.client.V1ObjectMeta" + ) # Replace 'scripts.workers.code_upload_submission_worker' with the actual module name + def test_get_job_object(self, MockV1ObjectMeta, MockV1Job): + # Arrange + submission_pk = 123 + spec = ( + MagicMock() + ) # Create a mock for V1JobSpec or provide a mock spec if you have one + expected_name = "submission-123" + + # Configure the mocks + MockV1ObjectMeta.return_value = MagicMock() + MockV1Job.return_value = MagicMock() + + # Act + result = get_job_object(submission_pk, spec) + + # Assert + MockV1ObjectMeta.assert_called_once_with(name=expected_name) + MockV1Job.assert_called_once_with( + api_version="batch/v1", + kind="Job", + metadata=MockV1ObjectMeta.return_value, + spec=spec, + ) + self.assertEqual(result, MockV1Job.return_value) + + @patch( + "scripts.workers.code_upload_submission_worker.client.V1Container" + ) # Replace 'scripts.workers.code_upload_submission_worker' with the actual module name + @patch( + "scripts.workers.code_upload_submission_worker.get_submission_meta_update_curl" + ) + def test_get_init_container( + self, MockGetSubmissionMetaUpdateCurl, MockV1Container + ): + # Arrange + submission_pk = 123 + curl_request = "curl_command" # Example curl command + MockGetSubmissionMetaUpdateCurl.return_value = curl_request + + expected_command = [ + "/bin/bash", + "-c", + "apt update && apt install -y curl && curl_command", + ] + + MockV1Container.return_value = MagicMock() + + # Act + result = get_init_container(submission_pk) + + # Assert + MockGetSubmissionMetaUpdateCurl.assert_called_once_with(submission_pk) + MockV1Container.assert_called_once_with( + name="init-container", image="ubuntu", command=expected_command + ) + self.assertEqual(result, MockV1Container.return_value) + + @patch( + "scripts.workers.code_upload_submission_worker.read_job" + ) # Adjust the path if necessary + def test_get_pods_from_job(self, MockReadJob): + # Arrange + api_instance = MagicMock() + core_v1_api_instance = MagicMock() + job_name = "job-123" + job_def = MagicMock() + job_def.metadata.labels = {"controller-uid": "uid-123"} + + # Mock methods + MockReadJob.return_value = job_def + core_v1_api_instance.list_namespaced_pod.return_value = "pod_list" + + expected_label_selector = "controller-uid=uid-123" + + # Act + result = get_pods_from_job( + api_instance, core_v1_api_instance, job_name + ) + + # Assert + MockReadJob.assert_called_once_with(api_instance, job_name) + core_v1_api_instance.list_namespaced_pod.assert_called_once_with( + namespace="default", + label_selector=expected_label_selector, + timeout_seconds=10, + ) + self.assertEqual(result, "pod_list") + + def test_get_job_constraints(self): + # Arrange + challenge_cpu_only = {"cpu_only_jobs": False} + challenge_cpu = { + "cpu_only_jobs": True, + "job_cpu_cores": "4", + "job_memory": "16Gi", + } + + # Act + result_cpu_only = get_job_constraints(challenge_cpu_only) + result_cpu = get_job_constraints(challenge_cpu) + + # Assert + self.assertEqual(result_cpu_only, {"nvidia.com/gpu": "1"}) + self.assertEqual(result_cpu, {"cpu": "4", "memory": "16Gi"}) + + @patch("scripts.workers.code_upload_submission_worker.get_job_object") + @patch("scripts.workers.code_upload_submission_worker.get_volume_list") + @patch("scripts.workers.code_upload_submission_worker.get_init_container") + @patch("scripts.workers.code_upload_submission_worker.get_job_constraints") + @patch( + "scripts.workers.code_upload_submission_worker.get_volume_mount_list" + ) + @patch("scripts.workers.code_upload_submission_worker.client.V1JobSpec") + @patch( + "scripts.workers.code_upload_submission_worker.client.V1PodTemplateSpec" + ) + @patch("scripts.workers.code_upload_submission_worker.client.V1PodSpec") + @patch("scripts.workers.code_upload_submission_worker.client.V1Container") + @patch("scripts.workers.code_upload_submission_worker.client.V1EnvVar") + def test_create_job_object( + self, + MockV1EnvVar, + MockV1Container, + MockV1PodSpec, + MockV1PodTemplateSpec, + MockV1JobSpec, + MockGetVolumeMountList, + MockGetJobConstraints, + MockGetInitContainer, + MockGetVolumeList, + MockGetJobObject, + ): + # Arrange + message = { + "submission_pk": 123, + "submitted_image_uri": "test-image-uri", + "submission_meta": {"submission_time_limit": 3600}, + } + environment_image = "env-image" + challenge = {"cpu_only_jobs": False} + + # Mock objects returned by the functions + MockV1EnvVar.side_effect = lambda name, value: f"mock-envvar-{name}" + MockV1Container.side_effect = lambda *args, **kwargs: "mock-container" + MockV1PodSpec.return_value = "mock-podspec" + MockV1PodTemplateSpec.return_value = "mock-podtemplatespec" + MockV1JobSpec.return_value = "mock-jobspec" + MockGetVolumeMountList.return_value = "mock-volume-mount-list" + MockGetJobConstraints.return_value = {"nvidia.com/gpu": "1"} + MockGetInitContainer.return_value = "mock-init-container" + MockGetVolumeList.return_value = "mock-volume-list" + MockGetJobObject.return_value = "mock-job-object" + + # Act + result = create_job_object(message, environment_image, challenge) + + # Assert + self.assertEqual(result, "mock-job-object") + + # Verify the mocked functions were called with the expected arguments + MockGetVolumeMountList.assert_called_once_with("/dataset") + MockGetJobConstraints.assert_called_once_with(challenge) + MockGetInitContainer.assert_called_once_with(message["submission_pk"]) + MockGetVolumeList.assert_called_once() + MockGetJobObject.assert_called_once_with(123, "mock-jobspec") + MockV1EnvVar.assert_any_call(name="PYTHONUNBUFFERED", value="1") + MockV1EnvVar.assert_any_call(name="AUTH_TOKEN", value="auth_token") + MockV1EnvVar.assert_any_call( + name="BODY", + value=( + '{"submission_pk": 123, "submitted_image_uri": "test-image-uri", ' + '"submission_meta": {"submission_time_limit": 3600}}' + ), + ) + MockV1EnvVar.assert_any_call( + name="SUBMISSION_TIME_LIMIT", value="3600" + ) + + MockV1Container.assert_any_call( + name="agent", + image=message["submitted_image_uri"], + env=["mock-envvar-PYTHONUNBUFFERED"], + ) + + MockV1Container.assert_any_call( + name="environment", + image=environment_image, + env=[ + "mock-envvar-PYTHONUNBUFFERED", + "mock-envvar-AUTH_TOKEN", + "mock-envvar-EVALAI_API_SERVER", + "mock-envvar-BODY", + "mock-envvar-SUBMISSION_TIME_LIMIT", + ], + resources=client.V1ResourceRequirements( + limits={"nvidia.com/gpu": "1"} + ), + volume_mounts="mock-volume-mount-list", + ) + + MockV1PodSpec.assert_called_once_with( + init_containers=["mock-init-container"], + containers=["mock-container", "mock-container"], + restart_policy="Never", + volumes="mock-volume-list", + ) + + MockV1PodTemplateSpec.assert_called_once_with( + metadata=client.V1ObjectMeta(labels={"app": "evaluation"}), + spec="mock-podspec", + ) + + MockV1JobSpec.assert_called_once_with( + backoff_limit=1, template="mock-podtemplatespec" + ) + + @patch("scripts.workers.code_upload_submission_worker.get_job_object") + @patch( + "scripts.workers.code_upload_submission_worker.get_empty_volume_object" + ) + @patch( + "scripts.workers.code_upload_submission_worker.get_volume_mount_object" + ) + @patch( + "scripts.workers.code_upload_submission_worker.get_config_map_volume_object" + ) + @patch("scripts.workers.code_upload_submission_worker.get_volume_list") + @patch( + "scripts.workers.code_upload_submission_worker.get_volume_mount_list" + ) + @patch("scripts.workers.code_upload_submission_worker.get_init_container") + @patch("scripts.workers.code_upload_submission_worker.get_job_constraints") + @patch("scripts.workers.code_upload_submission_worker.client.V1EnvVar") + @patch("scripts.workers.code_upload_submission_worker.client.V1Container") + @patch("scripts.workers.code_upload_submission_worker.client.V1PodSpec") + @patch( + "scripts.workers.code_upload_submission_worker.client.V1PodTemplateSpec" + ) + @patch("scripts.workers.code_upload_submission_worker.client.V1JobSpec") + def test_create_static_code_upload_submission_job_object( + self, + MockV1JobSpec, + MockV1PodTemplateSpec, + MockV1PodSpec, + MockV1Container, + MockV1EnvVar, + MockGetJobConstraints, + MockGetInitContainer, + MockGetVolumeMountList, + MockGetVolumeList, + MockGetConfigMapVolumeObject, + MockGetVolumeMountObject, + MockGetEmptyVolumeObject, + MockGetJobObject, + ): + # Arrange + message = { + "submission_pk": 123, + "submitted_image_uri": "test-image-uri", + "challenge_pk": 1, + "phase_pk": 2, + "submission_meta": {"submission_time_limit": 3600}, + } + challenge = {"cpu_only_jobs": False} + + # Mock return values + MockV1EnvVar.side_effect = lambda name, value: f"mock-envvar-{name}" + MockV1Container.side_effect = lambda *args, **kwargs: "mock-container" + MockV1PodSpec.return_value = "mock-podspec" + MockV1PodTemplateSpec.return_value = "mock-podtemplatespec" + MockV1JobSpec.return_value = "mock-jobspec" + MockGetJobConstraints.return_value = {"nvidia.com/gpu": "1"} + MockGetInitContainer.return_value = "mock-init-container" + MockGetVolumeMountList.return_value = [] + MockGetVolumeList.return_value = [] + MockGetConfigMapVolumeObject.return_value = "mock-config-map-volume" + MockGetVolumeMountObject.side_effect = ( + lambda mount_path, name, sub_path=None: f"mock-volume-mount-{name}" + ) + MockGetEmptyVolumeObject.return_value = "mock-empty-volume" + MockGetJobObject.return_value = "mock-job-object" + + # Act + result = create_static_code_upload_submission_job_object( + message, challenge + ) + + # Assert + self.assertEqual(result, "mock-job-object") + + # Verify calls + MockV1EnvVar.assert_any_call(name="PYTHONUNBUFFERED", value="1") + MockV1EnvVar.assert_any_call(name="SUBMISSION_PK", value="123") + MockV1EnvVar.assert_any_call(name="CHALLENGE_PK", value="1") + MockV1EnvVar.assert_any_call(name="PHASE_PK", value="2") + MockV1EnvVar.assert_any_call( + name="SUBMISSION_TIME_LIMIT", value="3600" + ) + MockV1EnvVar.assert_any_call(name="SUBMISSION_TIME_DELTA", value="300") + MockV1EnvVar.assert_any_call(name="AUTH_TOKEN", value="auth_token") + MockV1EnvVar.assert_any_call( + name="EVALAI_API_SERVER", value=EVALAI_API_SERVER + ) + + MockGetVolumeMountList.assert_called_once_with("/dataset", True) + MockGetVolumeList.assert_called_once() + MockGetConfigMapVolumeObject.assert_called_once_with( + "evalai-scripts-cm", "evalai-scripts" + ) + MockGetVolumeMountObject.assert_any_call( + "/evalai_scripts", "evalai-scripts", True + ) + MockGetVolumeMountObject.assert_any_call( + "/submission", "submissions-dir" + ) + MockGetEmptyVolumeObject.assert_called_once_with("submissions-dir") + MockGetJobObject.assert_called_once_with(123, "mock-jobspec") + + # Verify calls for volumes and mounts + MockGetVolumeList.assert_called_once() + MockGetEmptyVolumeObject.assert_called_once_with("submissions-dir") + MockGetVolumeMountObject.assert_any_call( + "/submission", "submissions-dir" + ) + self.assertIn("mock-config-map-volume", MockGetVolumeList.return_value) + self.assertIn( + "mock-volume-mount-submissions-dir", + MockGetVolumeMountList.return_value, + ) + + # Verify container configurations + MockV1Container.assert_any_call( + name="submission", + image="test-image-uri", + env=[ + "mock-envvar-PYTHONUNBUFFERED", + "mock-envvar-SUBMISSION_PATH", + "mock-envvar-CHALLENGE_PK", + "mock-envvar-PHASE_PK", + ], + resources=client.V1ResourceRequirements( + limits={"nvidia.com/gpu": "1"} + ), + volume_mounts=MockGetVolumeMountList.return_value, + ) + + MockV1Container.assert_any_call( + name="sidecar-container", + image="ubuntu:latest", + command=[ + "/bin/sh", + "-c", + "apt update && apt install -y curl && sh /evalai_scripts/monitor_submission.sh", + ], + env=[ + "mock-envvar-SUBMISSION_PATH", + "mock-envvar-CHALLENGE_PK", + "mock-envvar-PHASE_PK", + "mock-envvar-SUBMISSION_PK", + "mock-envvar-AUTH_TOKEN", + "mock-envvar-EVALAI_API_SERVER", + "mock-envvar-SUBMISSION_TIME_LIMIT", + "mock-envvar-SUBMISSION_TIME_DELTA", + ], + volume_mounts=MockGetVolumeMountList.return_value, + ) + + # Verify job creation + MockV1JobSpec.assert_called_once_with( + backoff_limit=1, template="mock-podtemplatespec" + ) + MockGetJobObject.assert_called_once_with(123, "mock-jobspec") + + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_create_job(self, MockLogger): + # Arrange + mock_api_instance = MagicMock() + mock_job = MagicMock() + mock_api_response = MagicMock() + mock_api_instance.create_namespaced_job.return_value = ( + mock_api_response + ) + + # Act + response = create_job(mock_api_instance, mock_job) + + # Assert + mock_api_instance.create_namespaced_job.assert_called_once_with( + body=mock_job, namespace="default", pretty=True + ) + MockLogger.info.assert_called_once_with( + "Job created with status='%s'" % str(mock_api_response.status) + ) + self.assertEqual(response, mock_api_response) + + @patch( + "scripts.workers.code_upload_submission_worker.client.V1DeleteOptions" + ) + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_delete_job(self, MockLogger, MockV1DeleteOptions): + # Arrange + mock_api_instance = MagicMock() + mock_job_name = "test-job" + mock_api_response = MagicMock() + mock_api_instance.delete_namespaced_job.return_value = ( + mock_api_response + ) + mock_delete_options = MagicMock() + MockV1DeleteOptions.return_value = mock_delete_options + + # Act + delete_job(mock_api_instance, mock_job_name) + + # Assert + mock_api_instance.delete_namespaced_job.assert_called_once_with( + name=mock_job_name, + namespace="default", + body=mock_delete_options, + ) + MockLogger.info.assert_called_once_with( + "Job deleted with status='%s'" % str(mock_api_response.status) + ) + MockV1DeleteOptions.assert_called_once_with( + propagation_policy="Foreground", grace_period_seconds=5 + ) + + @patch("scripts.workers.code_upload_submission_worker.create_job") + @patch( + "scripts.workers.code_upload_submission_worker.create_static_code_upload_submission_job_object" + ) + @patch("scripts.workers.code_upload_submission_worker.create_job_object") + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_process_submission_callback( + self, + MockLogger, + MockCreateJobObject, + MockCreateStaticJobObject, + MockCreateJob, + ): + # Arrange + mock_api_instance = MagicMock() + mock_body = { + "submission_pk": 123, + "is_static_dataset_code_upload_submission": False, + "challenge_pk": 1, + } + mock_challenge_phase = {"environment_image": "test-image"} + mock_challenge = MagicMock() + mock_evalai = MagicMock() + + mock_job_object = MagicMock() + MockCreateJobObject.return_value = mock_job_object + mock_response = MagicMock() + mock_response.metadata.name = "test-job-name" + MockCreateJob.return_value = mock_response + + # Act + process_submission_callback( + mock_api_instance, + mock_body, + mock_challenge_phase, + mock_challenge, + mock_evalai, + ) + + # Assert + MockLogger.info.assert_called_with( + "[x] Received submission message %s" % mock_body + ) + MockCreateJobObject.assert_called_once_with( + mock_body, + mock_challenge_phase["environment_image"], + mock_challenge, + ) + MockCreateJob.assert_called_once_with( + mock_api_instance, mock_job_object + ) + mock_evalai.update_submission_status.assert_called_once_with( + { + "submission_status": "queued", + "submission": 123, + "job_name": "test-job-name", + }, + mock_body["challenge_pk"], + ) + + @patch("scripts.workers.code_upload_submission_worker.create_job") + @patch( + "scripts.workers.code_upload_submission_worker.create_static_code_upload_submission_job_object" + ) + @patch("scripts.workers.code_upload_submission_worker.create_job_object") + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_process_submission_callback_static_job( + self, + MockLogger, + MockCreateJobObject, + MockCreateStaticJobObject, + MockCreateJob, + ): + # Arrange + mock_api_instance = MagicMock() + mock_body = { + "submission_pk": 123, + "is_static_dataset_code_upload_submission": True, + "challenge_pk": 1, + } + mock_challenge_phase = MagicMock() + mock_challenge = MagicMock() + mock_evalai = MagicMock() + + mock_static_job_object = MagicMock() + MockCreateStaticJobObject.return_value = mock_static_job_object + mock_response = MagicMock() + mock_response.metadata.name = "test-job-name" + MockCreateJob.return_value = mock_response + + # Act + process_submission_callback( + mock_api_instance, + mock_body, + mock_challenge_phase, + mock_challenge, + mock_evalai, + ) + + # Assert + MockLogger.info.assert_called_with( + "[x] Received submission message %s" % mock_body + ) + MockCreateStaticJobObject.assert_called_once_with( + mock_body, mock_challenge + ) + MockCreateJob.assert_called_once_with( + mock_api_instance, mock_static_job_object + ) + mock_evalai.update_submission_status.assert_called_once_with( + { + "submission_status": "queued", + "submission": 123, + "job_name": "test-job-name", + }, + mock_body["challenge_pk"], + ) + + @patch("scripts.workers.code_upload_submission_worker.client.BatchV1Api") + @patch("scripts.workers.code_upload_submission_worker.client.ApiClient") + @patch( + "scripts.workers.code_upload_submission_worker.client.Configuration" + ) + def test_get_api_object( + self, MockConfiguration, MockApiClient, MockBatchV1Api + ): + # Arrange + mock_challenge = {"id": 1} + mock_evalai = MagicMock() + mock_evalai.get_aws_eks_bearer_token.return_value = { + "aws_eks_bearer_token": "fake-token" + } + + # Act + api_instance = get_api_object( + "fake-cluster", "fake-endpoint", mock_challenge, mock_evalai + ) + + # Assert + MockConfiguration.assert_called_once() + MockConfiguration().host = "fake-endpoint" + MockConfiguration().verify_ssl = True + MockConfiguration().ssl_ca_cert = ( + "/code/scripts/workers/certificate.crt" + ) + MockConfiguration().api_key["authorization"] = "fake-token" + MockConfiguration().api_key_prefix["authorization"] = "Bearer" + MockApiClient.assert_called_once_with(MockConfiguration()) + MockBatchV1Api.assert_called_once_with(MockApiClient()) + self.assertEqual(api_instance, MockBatchV1Api()) + + @patch("scripts.workers.code_upload_submission_worker.client.ApiClient") + @patch( + "scripts.workers.code_upload_submission_worker.client.Configuration" + ) + def test_get_api_client(self, MockConfiguration, MockApiClient): + # Arrange + mock_challenge = {"id": 1} + mock_evalai = MagicMock() + mock_evalai.get_aws_eks_bearer_token.return_value = { + "aws_eks_bearer_token": "fake-token" + } + + # Act + api_instance = get_api_client( + "fake-cluster", "fake-endpoint", mock_challenge, mock_evalai + ) + + # Assert + MockConfiguration.assert_called_once() + MockConfiguration().host = "fake-endpoint" + MockConfiguration().verify_ssl = True + MockConfiguration().ssl_ca_cert = ( + "/code/scripts/workers/certificate.crt" + ) + MockConfiguration().api_key["authorization"] = "fake-token" + MockConfiguration().api_key_prefix["authorization"] = "Bearer" + MockApiClient.assert_called_once_with(MockConfiguration()) + self.assertEqual(api_instance, MockApiClient()) + + @patch("scripts.workers.code_upload_submission_worker.client.CoreV1Api") + @patch("scripts.workers.code_upload_submission_worker.client.ApiClient") + @patch( + "scripts.workers.code_upload_submission_worker.client.Configuration" + ) + def test_get_core_v1_api_object( + self, MockConfiguration, MockApiClient, MockCoreV1Api + ): + # Arrange + mock_challenge = {"id": 1} + mock_evalai = MagicMock() + mock_evalai.get_aws_eks_bearer_token.return_value = { + "aws_eks_bearer_token": "fake-token" + } + + # Act + api_instance = get_core_v1_api_object( + "fake-cluster", "fake-endpoint", mock_challenge, mock_evalai + ) + + # Assert + MockConfiguration.assert_called_once() + MockConfiguration().host = "fake-endpoint" + MockConfiguration().verify_ssl = True + MockConfiguration().ssl_ca_cert = ( + "/code/scripts/workers/certificate.crt" + ) + MockConfiguration().api_key["authorization"] = "fake-token" + MockConfiguration().api_key_prefix["authorization"] = "Bearer" + MockApiClient.assert_called_once_with(MockConfiguration()) + MockCoreV1Api.assert_called_once_with(MockApiClient()) + self.assertEqual(api_instance, MockCoreV1Api()) + + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_read_job(self, MockLogger): + # Arrange + mock_api_instance = MagicMock() + mock_job_name = "test-job" + mock_namespace = "default" + mock_api_response = MagicMock() + mock_api_instance.read_namespaced_job.return_value = mock_api_response + + # Act + api_response = read_job(mock_api_instance, mock_job_name) + + # Assert + mock_api_instance.read_namespaced_job.assert_called_once_with( + mock_job_name, mock_namespace + ) + self.assertEqual(api_response, mock_api_response) + MockLogger.exception.assert_not_called() + + @patch("scripts.workers.code_upload_submission_worker.cleanup_submission") + @patch("scripts.workers.code_upload_submission_worker.get_pods_from_job") + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_no_pods_returned( + self, MockLogger, mock_get_pods_from_job, mock_cleanup_submission + ): + # Arrange + mock_api_instance = MagicMock() + mock_core_v1_api_instance = MagicMock() + mock_evalai = MagicMock() + mock_get_pods_from_job.return_value = None + + # Act + update_failed_jobs_and_send_logs( + mock_api_instance, + mock_core_v1_api_instance, + mock_evalai, + "test-job", + 1, + 1, + 1, + "Test message", + "test-queue", + False, + False, + ) + + # Assert + MockLogger.exception.assert_called_once_with( + "Exception while reading Job test-job, does not exist." + ) + mock_cleanup_submission.assert_called_once() + + @patch("scripts.workers.code_upload_submission_worker.cleanup_submission") + @patch("scripts.workers.code_upload_submission_worker.get_pods_from_job") + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_pods_in_pending_state( + self, MockLogger, mock_get_pods_from_job, mock_cleanup_submission + ): + # Arrange + mock_api_instance = MagicMock() + mock_core_v1_api_instance = MagicMock() + mock_evalai = MagicMock() + mock_pods_list = MagicMock() + mock_pods_list.items[0].status.container_statuses = None + mock_get_pods_from_job.return_value = mock_pods_list + + # Act + update_failed_jobs_and_send_logs( + mock_api_instance, + mock_core_v1_api_instance, + mock_evalai, + "test-job", + 1, + 1, + 1, + "Test message", + "test-queue", + False, + False, + ) + + # Assert + MockLogger.info.assert_called_once_with( + "Job pods in pending state, waiting for node assignment for submission 1" + ) + mock_cleanup_submission.assert_not_called() + + @patch("scripts.workers.code_upload_submission_worker.cleanup_submission") + @patch("scripts.workers.code_upload_submission_worker.get_pods_from_job") + @patch("scripts.workers.code_upload_submission_worker.logger") + def test_general_exception_handling( + self, MockLogger, mock_get_pods_from_job, mock_cleanup_submission + ): + # Arrange + mock_api_instance = MagicMock() + mock_core_v1_api_instance = MagicMock() + mock_evalai = MagicMock() + mock_get_pods_from_job.side_effect = Exception("General Exception") + + # Act + update_failed_jobs_and_send_logs( + mock_api_instance, + mock_core_v1_api_instance, + mock_evalai, + "test-job", + 1, + 1, + 1, + "Test message", + "test-queue", + False, + False, + ) + + # Assert + MockLogger.exception.assert_called_once_with( + "Exception while reading Job General Exception" + ) + mock_cleanup_submission.assert_called_once() + + @patch( + "scripts.workers.code_upload_submission_worker.open", + new_callable=mock_open, + read_data="mock manifest content", + ) + @patch("scripts.workers.code_upload_submission_worker.yaml.load") + @patch("scripts.workers.code_upload_submission_worker.client.AppsV1Api") + def test_install_gpu_drivers_success( + self, mock_apps_api, mock_yaml_load, mock_open_file + ): + # Setup + mock_api_instance = MagicMock() + mock_apps_api_instance = mock_apps_api.return_value + mock_yaml_load.return_value = {"metadata": {"namespace": "default"}} + + # Run + install_gpu_drivers(mock_api_instance) + + # Assertions + mock_open_file.assert_called_once_with( + "/code/scripts/workers/code_upload_worker_utils/nvidia-device-plugin.yml" + ) + mock_yaml_load.assert_called_once_with( + "mock manifest content", yaml.FullLoader + ) # Updated this line + mock_apps_api_instance.create_namespaced_daemon_set.assert_called_once_with( + "default", mock_yaml_load.return_value + ) + + @patch( + "scripts.workers.code_upload_submission_worker.open", + new_callable=mock_open, + read_data="mock manifest content", + ) + @patch("scripts.workers.code_upload_submission_worker.yaml.load") + @patch("scripts.workers.code_upload_submission_worker.client.AppsV1Api") + def test_install_gpu_drivers_already_installed( + self, mock_apps_api, mock_yaml_load, mock_open_file + ): + # Setup + mock_api_instance = MagicMock() + mock_apps_api_instance = mock_apps_api.return_value + mock_yaml_load.return_value = {"metadata": {"namespace": "default"}} + mock_apps_api_instance.create_namespaced_daemon_set.side_effect = ( + ApiException(status=409) + ) + + # Run + install_gpu_drivers(mock_api_instance) + + # Assertions + mock_open_file.assert_called_once() + mock_yaml_load.assert_called_once() + mock_apps_api_instance.create_namespaced_daemon_set.assert_called_once() + + @patch( + "scripts.workers.code_upload_submission_worker.open", + new_callable=mock_open, + read_data="mock manifest content", + ) + @patch("scripts.workers.code_upload_submission_worker.yaml.load") + @patch("scripts.workers.code_upload_submission_worker.client.AppsV1Api") + def test_install_gpu_drivers_raises_exception( + self, mock_apps_api, mock_yaml_load, mock_open_file + ): + # Setup + mock_api_instance = MagicMock() + mock_apps_api_instance = mock_apps_api.return_value + mock_yaml_load.return_value = {"metadata": {"namespace": "default"}} + mock_apps_api_instance.create_namespaced_daemon_set.side_effect = ( + ApiException(status=500) + ) + + # Run and Assert + with self.assertRaises(ApiException): + install_gpu_drivers(mock_api_instance) + + mock_open_file.assert_called_once() + mock_yaml_load.assert_called_once() + mock_apps_api_instance.create_namespaced_daemon_set.assert_called_once() + + @patch("scripts.workers.code_upload_submission_worker.GracefulKiller") + @patch("scripts.workers.code_upload_submission_worker.EvalAI_Interface") + @patch("scripts.workers.code_upload_submission_worker.get_api_client") + @patch("scripts.workers.code_upload_submission_worker.get_api_object") + @patch( + "scripts.workers.code_upload_submission_worker.get_core_v1_api_object" + ) + @patch("scripts.workers.code_upload_submission_worker.install_gpu_drivers") + @patch( + "scripts.workers.code_upload_submission_worker.create_script_config_map" + ) + @patch("scripts.workers.code_upload_submission_worker.create_configmap") + def test_main_successful_execution( + self, + mock_create_configmap, + mock_create_script_config_map, + mock_install_gpu_drivers, + mock_get_core_v1_api_object, + mock_get_api_object, + mock_get_api_client, + mock_evalai, + mock_killer, + ): + # Setup + mock_evalai_instance = mock_evalai.return_value + mock_evalai_instance.get_challenge_by_queue_name.return_value = { + "title": "Test Challenge", + "remote_evaluation": 0, + "cpu_only_jobs": False, + "is_static_dataset_code_upload": True, + "id": 1, + "submission_time_limit": 100, + } + mock_evalai_instance.get_aws_eks_cluster_details.return_value = { + "name": "test-cluster", + "cluster_endpoint": "https://cluster-endpoint", + } + mock_killer_instance = mock_killer.return_value + mock_killer_instance.kill_now = True + + # Run + main() + + # Assertions + mock_evalai_instance.get_challenge_by_queue_name.assert_called() # Updated to allow multiple calls + assert ( + mock_evalai_instance.get_challenge_by_queue_name.call_count == 2 + ) # Verifies the number of calls + + @patch("scripts.workers.code_upload_submission_worker.GracefulKiller") + @patch("scripts.workers.code_upload_submission_worker.EvalAI_Interface") + @patch("scripts.workers.code_upload_submission_worker.get_api_client") + @patch("scripts.workers.code_upload_submission_worker.get_api_object") + @patch( + "scripts.workers.code_upload_submission_worker.get_core_v1_api_object" + ) + def test_main_without_gpu_installation( + self, + mock_get_core_v1_api_object, + mock_get_api_object, + mock_get_api_client, + mock_evalai, + mock_killer, + ): + # Setup + mock_evalai_instance = mock_evalai.return_value + mock_evalai_instance.get_challenge_by_queue_name.return_value = { + "title": "Test Challenge", + "remote_evaluation": 0, + "cpu_only_jobs": True, + "is_static_dataset_code_upload": False, + "id": 1, + "submission_time_limit": 100, + } + mock_killer_instance = mock_killer.return_value + mock_killer_instance.kill_now = True + + # Run + main() + + # Assertions + mock_evalai_instance.get_challenge_by_queue_name.assert_called() + assert mock_evalai_instance.get_challenge_by_queue_name.call_count == 2 diff --git a/tests/unit/worker/test_remote_submission_worker.py b/tests/unit/worker/test_remote_submission_worker.py new file mode 100644 index 0000000000..2d60d28c56 --- /dev/null +++ b/tests/unit/worker/test_remote_submission_worker.py @@ -0,0 +1,424 @@ +import logging +import os +import unittest +import signal +import io +import sys +from unittest.mock import Mock, mock_open, patch +import requests +from scripts.workers.remote_submission_worker import ( + CHALLENGE_DATA_BASE_DIR, + GracefulKiller, + download_and_extract_zip_file, + extract_challenge_data, + load_challenge, + make_request, + process_submission_message, + read_file_content, + stdout_redirect, + stderr_redirect, + alarm_handler, + ExecutionTimeLimitExceeded, +) + +logger = logging.getLogger() + + +class TestGracefulKiller(unittest.TestCase): + + def test_initial_state(self): + killer = GracefulKiller() + self.assertFalse(killer.kill_now, "kill_now should be False initially") + + def test_exit_gracefully(self): + killer = GracefulKiller() + killer.exit_gracefully(signum=signal.SIGINT, frame=None) + self.assertTrue( + killer.kill_now, "kill_now should be True after signal is received" + ) + + def test_signal_handlers(self): + killer = GracefulKiller() + self.assertEqual( + signal.getsignal(signal.SIGINT), killer.exit_gracefully + ) + self.assertEqual( + signal.getsignal(signal.SIGTERM), killer.exit_gracefully + ) + + +class TestContextManagers(unittest.TestCase): + + def test_stdout_redirect(self): + with io.StringIO() as buf, stdout_redirect(buf): + print("Hello, World!") + self.assertEqual(buf.getvalue(), "Hello, World!\n") + + def test_stderr_redirect(self): + with io.StringIO() as buf, stderr_redirect(buf): + print("Error!", file=sys.stderr) + self.assertEqual(buf.getvalue(), "Error!\n") + + +class TestAlarmHandler(unittest.TestCase): + + def test_alarm_handler(self): + with self.assertRaises(ExecutionTimeLimitExceeded): + alarm_handler(signum=signal.SIGALRM, frame=None) + + +class TestDownloadAndExtractZipFile(unittest.TestCase): + + @patch("requests.get") + @patch("builtins.open", new_callable=mock_open) + @patch("zipfile.ZipFile") + @patch("os.remove") + def test_successful_download_and_extraction( + self, mock_remove, mock_zipfile, mock_open, mock_get + ): + # Simulate a successful file download + mock_get.return_value.status_code = 200 + mock_get.return_value.content = b"Test content" + + download_and_extract_zip_file( + "http://example.com/test.zip", + "/path/to/download.zip", + "/path/to/extract/", + ) + + mock_open.assert_called_once_with("/path/to/download.zip", "wb") + mock_open().write.assert_called_once_with(b"Test content") + mock_zipfile.assert_called_once_with("/path/to/download.zip", "r") + mock_zipfile().extractall.assert_called_once_with("/path/to/extract/") + mock_remove.assert_called_once_with("/path/to/download.zip") + + @patch("requests.get") + @patch("os.remove") + def test_download_failure(self, mock_remove, mock_get): + # Simulate a failed file download + mock_get.side_effect = Exception("Network error") + + with self.assertLogs(logger, level="ERROR") as log: + download_and_extract_zip_file( + "http://example.com/test.zip", + "/path/to/download.zip", + "/path/to/extract/", + ) + self.assertIn( + "Failed to fetch file from http://example.com/test.zip, error Network error", + log.output[0], + ) + + mock_remove.assert_not_called() + + @patch("requests.get") + @patch("builtins.open", new_callable=mock_open) + @patch("zipfile.ZipFile") + @patch("os.remove") + def test_file_removal_failure( + self, mock_remove, mock_zipfile, mock_open, mock_get + ): + # Simulate a successful file download and extraction, but removal fails + mock_get.return_value.status_code = 200 + mock_get.return_value.content = b"Test content" + mock_remove.side_effect = Exception("File removal error") + + with self.assertLogs(logger, level="ERROR") as log: + download_and_extract_zip_file( + "http://example.com/test.zip", + "/path/to/download.zip", + "/path/to/extract/", + ) + self.assertIn( + "Failed to remove zip file /path/to/download.zip, error File removal error", + log.output[0], + ) + + mock_open().write.assert_called_once_with(b"Test content") + mock_zipfile().extractall.assert_called_once_with("/path/to/extract/") + + +class TestLoadChallenge(unittest.TestCase): + + @patch( + "scripts.workers.remote_submission_worker.get_challenge_by_queue_name" + ) + @patch( + "scripts.workers.remote_submission_worker.get_challenge_phases_by_challenge_pk" + ) + @patch("scripts.workers.remote_submission_worker.extract_challenge_data") + @patch( + "scripts.workers.remote_submission_worker.create_dir_as_python_package" + ) + def test_successful_load_challenge( + self, + mock_create_dir, + mock_extract_data, + mock_get_phases, + mock_get_challenge, + ): + # Mock the challenge and phases data + mock_challenge = { + "id": 1, + "evaluation_script": "http://example.com/evaluation_script.zip", + } + mock_get_challenge.return_value = mock_challenge + mock_get_phases.return_value = [ + {"id": 1, "test_annotation": "http://example.com/annotation.txt"} + ] + + load_challenge() + + mock_create_dir.assert_called_once_with(CHALLENGE_DATA_BASE_DIR) + mock_get_challenge.assert_called_once() + mock_get_phases.assert_called_once_with(1) + mock_extract_data.assert_called_once_with( + mock_challenge, mock_get_phases.return_value + ) + + @patch( + "scripts.workers.remote_submission_worker.get_challenge_by_queue_name" + ) + @patch( + "scripts.workers.remote_submission_worker.create_dir_as_python_package" + ) + def test_load_challenge_exception( + self, mock_create_dir, mock_get_challenge + ): + # Simulate an exception when fetching the challenge + mock_get_challenge.side_effect = Exception("Challenge not found") + + with self.assertLogs(logger, level="ERROR") as log: + with self.assertRaises(Exception): + load_challenge() + self.assertIn("Challenge with queue name", log.output[0]) + + mock_create_dir.assert_called_once_with(CHALLENGE_DATA_BASE_DIR) + + +class TestExtractChallengeData(unittest.TestCase): + @patch( + "scripts.workers.remote_submission_worker.download_and_extract_file" + ) + @patch("scripts.workers.remote_submission_worker.importlib.import_module") + @patch("scripts.workers.remote_submission_worker.create_dir") + @patch( + "scripts.workers.remote_submission_worker.create_dir_as_python_package" + ) + def test_extract_challenge_data_import_exception( + self, + mock_create_dir_as_pkg, + mock_create_dir, + mock_import_module, + mock_download_file, + ): + # Mock the challenge and phases data + mock_challenge = { + "id": 1, + "evaluation_script": "http://example.com/evaluation_script.zip", + } + mock_phases = [ + {"id": 1, "test_annotation": "http://example.com/annotation.txt"} + ] + + # Simulate an exception during import + mock_import_module.side_effect = ImportError("Import failed") + + with self.assertLogs(logger, level="ERROR") as log: + with self.assertRaises(ImportError): + extract_challenge_data(mock_challenge, mock_phases) + self.assertIn( + "Exception raised while creating Python module for challenge_id: 1", + log.output[0], + ) + + mock_create_dir_as_pkg.assert_called_once_with( + os.path.join(CHALLENGE_DATA_BASE_DIR, "challenge_1") + ) + + mock_download_file.assert_called_once() + + @patch( + "scripts.workers.remote_submission_worker.download_and_extract_file" + ) + @patch( + "scripts.workers.remote_submission_worker.download_and_extract_zip_file" + ) + @patch("scripts.workers.remote_submission_worker.importlib.import_module") + @patch("scripts.workers.remote_submission_worker.create_dir") + @patch( + "scripts.workers.remote_submission_worker.create_dir_as_python_package" + ) + def test_successful_extract_challenge_data( + self, + mock_create_dir_as_pkg, + mock_create_dir, + mock_import_module, + mock_download_zip, + mock_download_file, + ): + # Mock the challenge and phases data + mock_challenge = { + "id": 1, + "evaluation_script": "http://example.com/evaluation_script.zip", + } + mock_phases = [ + {"id": 1, "test_annotation": "http://example.com/annotation.txt"} + ] + + extract_challenge_data(mock_challenge, mock_phases) + + mock_create_dir_as_pkg.assert_called_once_with( + os.path.join(CHALLENGE_DATA_BASE_DIR, "challenge_1") + ) + + mock_download_file.assert_called_once() + mock_download_zip.assert_called_once() + mock_import_module.assert_called_once() + + @patch( + "scripts.workers.remote_submission_worker.get_challenge_by_queue_name" + ) + @patch( + "scripts.workers.remote_submission_worker.get_challenge_phase_by_pk" + ) + @patch("scripts.workers.remote_submission_worker.extract_submission_data") + @patch("scripts.workers.remote_submission_worker.run_submission") + def test_process_submission_message_success( + self, + mock_run_submission, + mock_extract_submission_data, + mock_get_challenge_phase_by_pk, + mock_get_challenge_by_queue_name, + ): + # Mock data + mock_message = {"challenge_pk": 1, "phase_pk": 2, "submission_pk": 3} + mock_submission_instance = {"input_file": "file.txt"} + mock_challenge = {"remote_evaluation": True} + mock_challenge_phase = Mock() + + # Mocking return values + mock_extract_submission_data.return_value = mock_submission_instance + mock_get_challenge_by_queue_name.return_value = mock_challenge + mock_get_challenge_phase_by_pk.return_value = mock_challenge_phase + + # Call the function + process_submission_message(mock_message) + + # Assertions to ensure all functions are called with correct parameters + mock_extract_submission_data.assert_called_once_with(3) + mock_get_challenge_by_queue_name.assert_called_once() + mock_get_challenge_phase_by_pk.assert_called_once_with(1, 2) + mock_run_submission.assert_called_once() # Removed the incorrect assertion + + @patch("scripts.workers.remote_submission_worker.extract_submission_data") + def test_process_submission_message_no_submission_instance( + self, mock_extract_submission_data + ): + # Mock data + mock_message = {"challenge_pk": 1, "phase_pk": 2, "submission_pk": 3} + + # Mocking return values + mock_extract_submission_data.return_value = None + + # Call the function and expect it to return early + result = process_submission_message(mock_message) + self.assertIsNone(result) + mock_extract_submission_data.assert_called_once_with(3) + + @patch("scripts.workers.remote_submission_worker.requests.get") + @patch("scripts.workers.remote_submission_worker.get_request_headers") + @patch("scripts.workers.remote_submission_worker.logger") + def test_make_request_get_exception( + self, mock_logger, mock_get_request_headers, mock_requests_get + ): + # Mock data + url = "http://example.com" + method = "GET" + + # Mock the request to raise an exception + mock_requests_get.side_effect = requests.exceptions.RequestException( + "Connection error" + ) + + # Ensure logger.info is called and exception is raised + with self.assertRaises(requests.exceptions.RequestException): + make_request(url, method) + + # Check if logger.info was called + mock_logger.info.assert_called_once_with( + "The worker is not able to establish connection with EvalAI" + ) + + @patch("scripts.workers.remote_submission_worker.requests.patch") + @patch("scripts.workers.remote_submission_worker.get_request_headers") + @patch("scripts.workers.remote_submission_worker.logger") + def test_make_request_patch_request_exception( + self, mock_logger, mock_get_request_headers, mock_requests_patch + ): + # Mock data + url = "http://example.com" + method = "PATCH" + data = {"key": "value"} + + # Mock the request to raise an exception + mock_requests_patch.side_effect = requests.exceptions.RequestException( + "Connection error" + ) + + # Ensure logger.info is called and exception is raised + with self.assertRaises(requests.exceptions.RequestException): + make_request(url, method, data=data) + + # Check if logger.info was called + mock_logger.info.assert_called_once_with( + "The worker is not able to establish connection with EvalAI" + ) + + @patch("scripts.workers.remote_submission_worker.requests.post") + @patch("scripts.workers.remote_submission_worker.get_request_headers") + @patch("scripts.workers.remote_submission_worker.logger") + def test_make_request_post_request_exception( + self, mock_logger, mock_get_request_headers, mock_requests_post + ): + # Mock data + url = "http://example.com" + method = "POST" + data = {"key": "value"} + + # Mock the request to raise an exception + mock_requests_post.side_effect = requests.exceptions.RequestException( + "Connection error" + ) + + # Ensure logger.info is called and exception is raised + with self.assertRaises(requests.exceptions.RequestException): + make_request(url, method, data=data) + + # Check if logger.info was called + mock_logger.info.assert_called_once_with( + "The worker is not able to establish connection with EvalAI" + ) + + @patch("builtins.open", new_callable=mock_open, read_data="some content") + def test_read_file_content_normal(self, mock_open): + # Test reading a file with some content + file_path = "test_file.txt" + result = read_file_content(file_path) + self.assertEqual(result, "some content") + mock_open.assert_called_once_with(file_path, "r") + + @patch("builtins.open", new_callable=mock_open, read_data="") + def test_read_file_content_empty(self, mock_open): + # Test reading an empty file + file_path = "test_empty_file.txt" + result = read_file_content(file_path) + self.assertEqual(result, " ") + mock_open.assert_called_once_with(file_path, "r") + + @patch("builtins.open", side_effect=FileNotFoundError) + def test_read_file_content_file_not_found(self, mock_open): + # Test reading a file that does not exist + file_path = "non_existent_file.txt" + with self.assertRaises(FileNotFoundError): + read_file_content(file_path) diff --git a/tests/unit/worker/test_statsd_utils.py b/tests/unit/worker/test_statsd_utils.py new file mode 100644 index 0000000000..94d623a379 --- /dev/null +++ b/tests/unit/worker/test_statsd_utils.py @@ -0,0 +1,61 @@ +import unittest +from unittest.mock import patch +from scripts.workers.statsd_utils import ( + increment_statsd_counter, + increment_and_push_metrics_to_statsd, +) + + +class TestStatsdFunctions(unittest.TestCase): + + @patch("scripts.workers.statsd_utils.statsd.increment") + def test_increment_statsd_counter(self, mock_increment): + metric_name = "test_metric" + tags = ["tag1", "tag2"] + inc_value = 1 + + increment_statsd_counter(metric_name, tags, inc_value) + + mock_increment.assert_called_once_with( + metric_name, inc_value, tags=tags + ) + + @patch("scripts.workers.statsd_utils.statsd.increment") + def test_increment_statsd_counter_with_exception(self, mock_increment): + metric_name = "test_metric" + tags = ["tag1", "tag2"] + inc_value = 1 + + mock_increment.side_effect = Exception("Mocked exception") + + with self.assertRaises(Exception): + increment_statsd_counter(metric_name, tags, inc_value) + + @patch("scripts.workers.statsd_utils.logger.exception") + @patch("scripts.workers.statsd_utils.statsd.increment") + def test_increment_and_push_metrics_to_statsd_exception( + self, mock_increment, mock_logger_exception + ): + queue_name = "test_queue" + is_remote = True + + mock_increment.side_effect = Exception("Mocked exception") + + increment_and_push_metrics_to_statsd(queue_name, is_remote) + + mock_logger_exception.assert_called_once() + + @patch("scripts.workers.statsd_utils.statsd.increment") + def test_increment_and_push_metrics_to_statsd_success( + self, mock_increment + ): + queue_name = "test_queue" + is_remote = True + + increment_and_push_metrics_to_statsd(queue_name, is_remote) + + mock_increment.assert_called_once_with( + "num_processed_submissions", + 1, + tags=["queue_name:test_queue", "is_remote:1"], + ) diff --git a/tests/unit/worker/test_submission_worker.py b/tests/unit/worker/test_submission_worker.py index 3e17fd4184..96b142ddc5 100644 --- a/tests/unit/worker/test_submission_worker.py +++ b/tests/unit/worker/test_submission_worker.py @@ -1,6 +1,7 @@ import os import shutil import tempfile +from unittest.mock import Mock import zipfile from datetime import timedelta from io import BytesIO @@ -19,7 +20,14 @@ from moto import mock_sqs from participants.models import ParticipantTeam from rest_framework.test import APITestCase - +import signal +from scripts.workers.submission_worker import ( + ExecutionTimeLimitExceeded, + GracefulKiller, + MultiOut, + alarm_handler, + delete_old_temp_directories, +) from scripts.workers.submission_worker import ( create_dir, create_dir_as_python_package, @@ -301,9 +309,9 @@ def test_get_or_create_sqs_queue_for_non_existing_queue(self): @mock_sqs() def test_get_or_create_sqs_queue_for_existing_host_queue(self): get_or_create_sqs_queue("test_host_queue_2", self.challenge2) - queue_url = self.sqs_client.get_queue_url(QueueName="test_host_queue_2")[ - "QueueUrl" - ] + queue_url = self.sqs_client.get_queue_url( + QueueName="test_host_queue_2" + )["QueueUrl"] self.assertTrue(queue_url) self.sqs_client.delete_queue(QueueUrl=queue_url) @@ -447,3 +455,57 @@ def test_delete_zip_file_error(self, mock_remove, mock_logger): delete_zip_file(self.download_location) mock_logger.assert_called_with(error_message) + + def test_sigint_signal(self): + killer = GracefulKiller() + os.kill(os.getpid(), signal.SIGINT) + self.assertTrue(killer.kill_now) + + def test_sigterm_signal(self): + killer = GracefulKiller() + os.kill(os.getpid(), signal.SIGTERM) + self.assertTrue(killer.kill_now) + + def test_write(self): + mock_handle1 = Mock() + mock_handle2 = Mock() + multi_out = MultiOut(mock_handle1, mock_handle2) + multi_out.write("test string") + mock_handle1.write.assert_called_with("test string") + mock_handle2.write.assert_called_with("test string") + + def test_flush(self): + mock_handle1 = Mock() + mock_handle2 = Mock() + + multi_out = MultiOut(mock_handle1, mock_handle2) + multi_out.flush() + mock_handle1.flush.assert_called_once() + mock_handle2.flush.assert_called_once() + + def test_alarm_handler(self): + with self.assertRaises(ExecutionTimeLimitExceeded): + alarm_handler(signal.SIGALRM, None) + + @mock.patch("scripts.workers.submission_worker.os.path.getctime") + @mock.patch("scripts.workers.submission_worker.shutil.rmtree") + def test_delete_old_temp_directories(self, mock_rmtree, mock_getctime): + # Create temporary directories + temp_dir = tempfile.gettempdir() + old_dir = tempfile.mkdtemp(prefix="tmp", dir=temp_dir) + new_dir = tempfile.mkdtemp(prefix="tmp", dir=temp_dir) + + # Mock creation times + mock_getctime.side_effect = lambda path: {old_dir: 100, new_dir: 200}[ + path + ] + + # Call the function + delete_old_temp_directories(prefix="tmp") + + # Verify that the old directory is deleted and the new one is not + mock_rmtree.assert_called_once_with(old_dir) + self.assertNotIn(mock.call(new_dir), mock_rmtree.mock_calls) + + # Clean up + shutil.rmtree(new_dir) diff --git a/tests/unit/worker/test_worker_utils.py b/tests/unit/worker/test_worker_utils.py new file mode 100644 index 0000000000..c8614e80da --- /dev/null +++ b/tests/unit/worker/test_worker_utils.py @@ -0,0 +1,223 @@ +import unittest +from unittest.mock import patch, MagicMock + +import requests +from scripts.workers.worker_utils import EvalAI_Interface + + +class TestEvalAIInterface(unittest.TestCase): + def setUp(self): + self.AUTH_TOKEN = "dummy_token" + self.EVALAI_API_SERVER = "http://dummy.api.server" + self.QUEUE_NAME = "dummy_queue" + self.api = EvalAI_Interface( + self.AUTH_TOKEN, self.EVALAI_API_SERVER, self.QUEUE_NAME + ) + + @patch( + "scripts.workers.worker_utils.requests.request" + ) # Adjust the import path + def test_get_request_headers(self, mock_request): + headers = self.api.get_request_headers() + self.assertEqual(headers, {"Authorization": "Bearer dummy_token"}) + + @patch("scripts.workers.worker_utils.requests.request") + def test_make_request_success(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"key": "value"} + mock_request.return_value = mock_response + + url = "http://example.com" + method = "GET" + response = self.api.make_request(url, method) + + self.assertEqual(response, {"key": "value"}) + mock_request.assert_called_once_with( + method=method, + url=url, + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_make_request_failure(self, mock_request): + mock_request.side_effect = requests.exceptions.RequestException + + with self.assertRaises(requests.exceptions.RequestException): + self.api.make_request("http://example.com", "GET") + + def test_return_url_per_environment(self): + url = "/api/test" + full_url = self.api.return_url_per_environment(url) + self.assertEqual(full_url, "http://dummy.api.server/api/test") + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_message_from_sqs_queue(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"message": "test"} + mock_request.return_value = mock_response + + response = self.api.get_message_from_sqs_queue() + self.assertEqual(response, {"message": "test"}) + url = "/api/jobs/challenge/queues/dummy_queue/" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_submission_by_pk(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"submission": "data"} + mock_request.return_value = mock_response + + response = self.api.get_submission_by_pk(1) + self.assertEqual(response, {"submission": "data"}) + url = "/api/jobs/submission/1" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_challenge_phases_by_challenge_pk(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"phases": "data"} + mock_request.return_value = mock_response + + response = self.api.get_challenge_phases_by_challenge_pk(1) + self.assertEqual(response, {"phases": "data"}) + url = "/api/challenges/1/phases/" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_challenge_by_queue_name(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"challenge": "data"} + mock_request.return_value = mock_response + + response = self.api.get_challenge_by_queue_name() + self.assertEqual(response, {"challenge": "data"}) + url = "/api/challenges/challenge/queues/dummy_queue/" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_challenge_phase_by_pk(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"phase": "data"} + mock_request.return_value = mock_response + + response = self.api.get_challenge_phase_by_pk(1, 2) + self.assertEqual(response, {"phase": "data"}) + url = "/api/challenges/challenge/1/challenge_phase/2" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_update_submission_data(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"result": "success"} + mock_request.return_value = mock_response + + data = {"submission": "data"} + response = self.api.update_submission_data(data, 1, 2) + self.assertEqual(response, {"result": "success"}) + url = "/api/jobs/challenge/1/update_submission/" + mock_request.assert_called_once_with( + method="PUT", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=data, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_update_submission_status(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"result": "success"} + mock_request.return_value = mock_response + + data = {"status": "finished"} + response = self.api.update_submission_status(data, 1) + self.assertEqual(response, {"result": "success"}) + url = "/api/jobs/challenge/1/update_submission/" + mock_request.assert_called_once_with( + method="PATCH", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=data, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_aws_eks_bearer_token(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"token": "token_data"} + mock_request.return_value = mock_response + + response = self.api.get_aws_eks_bearer_token(1) + self.assertEqual(response, {"token": "token_data"}) + url = "/api/jobs/challenge/1/eks_bearer_token/" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_get_aws_eks_cluster_details(self, mock_request): + mock_response = MagicMock() + mock_response.json.return_value = {"cluster": "details"} + mock_request.return_value = mock_response + + response = self.api.get_aws_eks_cluster_details(1) + self.assertEqual(response, {"cluster": "details"}) + url = "/api/challenges/1/evaluation_cluster/" + mock_request.assert_called_once_with( + method="GET", + url=self.api.return_url_per_environment(url), + headers=self.api.get_request_headers(), + data=None, + ) + + @patch("scripts.workers.worker_utils.requests.request") + def test_delete_message_from_sqs_queue(self, mock_request): + # Mock the response of the requests.request method + mock_response = MagicMock() + mock_response.json.return_value = {"result": "success"} + mock_request.return_value = mock_response + + # Call the method to test + receipt_handle = "handle123" + response = self.api.delete_message_from_sqs_queue(receipt_handle) + # Expected URL construction + expected_url = "http://dummy.api.server/api/jobs/queues/dummy_queue/" + + # Expected data payload + expected_data = {"receipt_handle": receipt_handle} + + # Assertions + mock_request.assert_called_once_with( + method="POST", + url=expected_url, + headers=self.api.get_request_headers(), + data=expected_data, + ) + self.assertEqual(response, {"result": "success"})