diff --git a/reana_job_controller/htcondorcern_job_manager.py b/reana_job_controller/htcondorcern_job_manager.py index 11e5eb7d..ffe965fc 100644 --- a/reana_job_controller/htcondorcern_job_manager.py +++ b/reana_job_controller/htcondorcern_job_manager.py @@ -294,8 +294,19 @@ def spool_output(backend_job_id): logging.info("Spooling jobs {} output.".format(backend_job_id)) schedd.retrieve("ClusterId == {}".format(backend_job_id)) - def get_logs(backend_job_id, workspace): - """Return job logs if log files are present.""" + @classmethod + def get_logs(cls, backend_job_id, **kwargs): + """Return job logs if log files are present. + + :param backend_job_id: ID of the job in the backend. + :param kwargs: Additional parameters needed to fetch logs. + In the case of HTCondor, the ``workspace`` parameter is needed. + :return: String containing the job logs. + """ + if "workspace" not in kwargs: + raise ValueError("Missing 'workspace' parameter") + workspace = kwargs["workspace"] + stderr_file = os.path.join( workspace, "reana_job." + str(backend_job_id) + ".0.err" ) diff --git a/reana_job_controller/job_manager.py b/reana_job_controller/job_manager.py index 84a6a957..b4236aee 100644 --- a/reana_job_controller/job_manager.py +++ b/reana_job_controller/job_manager.py @@ -92,11 +92,14 @@ def get_status(self): """ raise NotImplementedError - def get_logs(self): - """Get job log. - - :returns: stderr, stdout of a job. - :rtype: dict + @classmethod + def get_logs(cls, backend_job_id, **kwargs): + """Return job logs if log files are present. + + :param backend_job_id: ID of the job in the backend. + :param kwargs: Additional parameters needed to fetch logs. + These depend on the chosen compute backend. + :return: String containing the job logs. """ raise NotImplementedError diff --git a/reana_job_controller/job_monitor.py b/reana_job_controller/job_monitor.py index 415aeac4..5b564806 100644 --- a/reana_job_controller/job_monitor.py +++ b/reana_job_controller/job_monitor.py @@ -29,6 +29,7 @@ SLURM_SSH_AUTH_TIMEOUT, ) from reana_job_controller.job_db import JOB_DB, store_job_logs, update_job_status +from reana_job_controller.kubernetes_job_manager import KubernetesJobManager from reana_job_controller.utils import SSHClient, singleton @@ -214,70 +215,6 @@ def get_job_status(self, job_pod) -> Optional[str]: return status - def _get_containers_logs(self, job_pod) -> Optional[str]: - try: - pod_logs = "" - container_statuses = self._get_job_container_statuses(job_pod) - - logging.info(f"Grabbing pod {job_pod.metadata.name} logs ...") - for container in container_statuses: - # If we are here, it means that either all the containers have finished - # running or there has been some sort of failure. For this reason we get - # the logs of all containers, even if they are still running, as the job - # will not continue running after this anyway. - if container.state.terminated or container.state.running: - container_log = ( - current_k8s_corev1_api_client.read_namespaced_pod_log( - namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, - name=job_pod.metadata.name, - container=container.name, - ) - ) - pod_logs += "{}: :\n {}\n".format(container.name, container_log) - if hasattr(container.state.terminated, "reason"): - pod_logs += "\n{}\n".format(container.state.terminated.reason) - elif container.state.waiting: - # No need to fetch logs, as the container has not started yet. - pod_logs += "Container {} failed, error: {}".format( - container.name, container.state.waiting.message - ) - - return pod_logs - except client.rest.ApiException as e: - logging.error(f"Error from Kubernetes API while getting job logs: {e}") - return None - except Exception as e: - logging.error(traceback.format_exc()) - logging.error("Unexpected error: {}".format(e)) - return None - - def get_job_logs(self, job_pod) -> Optional[str]: - """Get job logs.""" - logs = self._get_containers_logs(job_pod) - - if job_pod.status.reason == "DeadlineExceeded": - if not logs: - logs = "" - - backend_job_id = self.get_backend_job_id(job_pod) - message = f"\n{job_pod.status.reason}\nThe job was killed due to exceeding timeout" - - try: - specified_timeout = job_pod.spec.active_deadline_seconds - message += f" of {specified_timeout} seconds." - except AttributeError: - message += "." - logging.error( - f"Kubernetes job id: {backend_job_id}. Could not get job timeout from Job spec." - ) - - logs += message - logging.info( - f"Kubernetes job id: {backend_job_id} was killed due to timeout." - ) - - return logs - def watch_jobs(self, job_db, app=None): """Open stream connection to k8s apiserver to watch all jobs status. @@ -302,7 +239,9 @@ def watch_jobs(self, job_db, app=None): backend_job_id = self.get_backend_job_id(job_pod) reana_job_id = self.get_reana_job_id(backend_job_id) - logs = self.get_job_logs(job_pod) + logs = self.job_manager_cls.get_logs( + backend_job_id, job_pod=job_pod + ) store_job_logs(reana_job_id, logs) update_job_status(reana_job_id, job_status) @@ -310,7 +249,7 @@ def watch_jobs(self, job_db, app=None): if JobStatus.should_cleanup_job(job_status): self.clean_job(backend_job_id) except client.rest.ApiException as e: - logging.error( + logging.exception( f"Error from Kubernetes API while watching jobs pods: {e}" ) except Exception as e: @@ -414,7 +353,7 @@ def watch_jobs(self, job_db, app): job_logs = app.htcondor_executor.submit( self.job_manager_cls.get_logs, job_dict["backend_job_id"], - job_db[job_id]["obj"].workflow_workspace, + workspace=job_db[job_id]["obj"].workflow_workspace, ) logs = job_logs.result() store_job_logs(job_id, logs) diff --git a/reana_job_controller/kubernetes_job_manager.py b/reana_job_controller/kubernetes_job_manager.py index 7327aa66..1aa7e55c 100644 --- a/reana_job_controller/kubernetes_job_manager.py +++ b/reana_job_controller/kubernetes_job_manager.py @@ -34,7 +34,10 @@ validate_kubernetes_memory, kubernetes_memory_to_bytes, ) -from reana_commons.k8s.api_client import current_k8s_batchv1_api_client +from reana_commons.k8s.api_client import ( + current_k8s_batchv1_api_client, + current_k8s_corev1_api_client, +) from reana_commons.k8s.kerberos import get_kerberos_k8s_config from reana_commons.k8s.secrets import REANAUserSecretsStore from reana_commons.k8s.volumes import ( @@ -243,6 +246,99 @@ def _submit(self): logging.exception("Unexpected error while submitting a job") raise + @classmethod + def _get_containers_logs(cls, job_pod) -> Optional[str]: + """Fetch the logs from all the containers in the given pod. + + :param job_pod: Pod resource coming from Kubernetes. + """ + try: + pod_logs = "" + container_statuses = (job_pod.status.container_statuses or []) + ( + job_pod.status.init_container_statuses or [] + ) + + logging.info(f"Grabbing pod {job_pod.metadata.name} logs ...") + for container in container_statuses: + # If we are here, it means that either all the containers have finished + # running or there has been some sort of failure. For this reason we get + # the logs of all containers, even if they are still running, as the job + # will not continue running after this anyway. + if container.state.terminated or container.state.running: + container_log = ( + current_k8s_corev1_api_client.read_namespaced_pod_log( + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + name=job_pod.metadata.name, + container=container.name, + ) + ) + pod_logs += "{}: :\n {}\n".format(container.name, container_log) + if hasattr(container.state.terminated, "reason"): + pod_logs += "\n{}\n".format(container.state.terminated.reason) + elif container.state.waiting: + # No need to fetch logs, as the container has not started yet. + pod_logs += "Container {} failed, error: {}".format( + container.name, container.state.waiting.message + ) + + return pod_logs + except client.rest.ApiException as e: + logging.error(f"Error from Kubernetes API while getting job logs: {e}") + return None + except Exception as e: + logging.error(traceback.format_exc()) + logging.error("Unexpected error: {}".format(e)) + return None + + @classmethod + def get_logs(cls, backend_job_id, **kwargs): + """Return job logs. + + :param backend_job_id: ID of the job in the backend. + :param kwargs: Additional parameters needed to fetch logs. + In the case of Kubernetes, the ``job_pod`` parameter can be specified + to avoid fetching the pod specification from Kubernetes. + :return: String containing the job logs. + """ + if "job_pod" in kwargs: + job_pod = kwargs["job_pod"] + assert ( + job_pod.metadata.labels["job-name"] == backend_job_id + ), "Pod does not refer to correct job." + else: + job_pods = current_k8s_corev1_api_client.list_namespaced_pod( + namespace=REANA_RUNTIME_KUBERNETES_NAMESPACE, + label_selector=f"job-name={backend_job_id}", + ) + if not job_pods.items: + logging.error(f"Could not find any pod for job {backend_job_id}") + return None + job_pod = job_pods.items[0] + + logs = cls._get_containers_logs(job_pod) + + if job_pod.status.reason == "DeadlineExceeded": + if not logs: + logs = "" + + message = f"\n{job_pod.status.reason}\nThe job was killed due to exceeding timeout" + + try: + specified_timeout = job_pod.spec.active_deadline_seconds + message += f" of {specified_timeout} seconds." + except AttributeError: + message += "." + logging.error( + f"Kubernetes job id: {backend_job_id}. Could not get job timeout from Job spec." + ) + + logs += message + logging.info( + f"Kubernetes job id: {backend_job_id} was killed due to timeout." + ) + + return logs + def stop(backend_job_id, asynchronous=True): """Stop Kubernetes job execution. diff --git a/reana_job_controller/slurmcern_job_manager.py b/reana_job_controller/slurmcern_job_manager.py index 1f5569d9..e3a075a7 100644 --- a/reana_job_controller/slurmcern_job_manager.py +++ b/reana_job_controller/slurmcern_job_manager.py @@ -236,8 +236,19 @@ def _download_dir(sftp, remote_dir, local_dir): else: sftp.get(remote_path, local_path) - def get_logs(backend_job_id, workspace): - """Return job logs if log files are present.""" + @classmethod + def get_logs(cls, backend_job_id, **kwargs): + """Return job logs if log files are present. + + :param backend_job_id: ID of the job in the backend. + :param kwargs: Additional parameters needed to fetch logs. + In the case of Slurm, the ``workspace`` parameter is needed. + :return: String containing the job logs. + """ + if "workspace" not in kwargs: + raise ValueError("Missing 'workspace' parameter") + workspace = kwargs["workspace"] + stderr_file = os.path.join( workspace, "reana_job." + str(backend_job_id) + ".err" ) diff --git a/tests/test_job_manager.py b/tests/test_job_manager.py index 68d8bf39..10a5c14e 100644 --- a/tests/test_job_manager.py +++ b/tests/test_job_manager.py @@ -154,3 +154,28 @@ def cache_job(self): job_manager = TestJobManger("docker.io/library/busybox", "ls", {}) job_manager.execute() assert job_manager.order_list == [1, 2, 3, 4] + + +@pytest.mark.parametrize( + "k8s_phase,k8s_container_state,k8s_logs,pod_logs", + [ + ("Pending", "ErrImagePull", "pull access denied", None), + ("Pending", "InvalidImageName", "couldn't parse image", None), + ("Succeeded", "Completed", None, "job finished"), + ("Failed", "Error", None, "job failed"), + ], +) +def test_kubernetes_get_job_logs( + k8s_phase, k8s_container_state, k8s_logs, pod_logs, app, kubernetes_job_pod +): + """Test retrieval of job logs.""" + k8s_corev1_api_client = mock.MagicMock() + k8s_corev1_api_client.read_namespaced_pod_log = lambda **kwargs: pod_logs + with mock.patch( + "reana_job_controller.kubernetes_job_manager.current_k8s_corev1_api_client", + k8s_corev1_api_client, + ): + job_pod = kubernetes_job_pod(k8s_phase, k8s_container_state) + assert (k8s_logs or pod_logs) in KubernetesJobManager.get_logs( + job_pod.metadata.labels["job-name"], job_pod=job_pod + ) diff --git a/tests/test_job_monitor.py b/tests/test_job_monitor.py index 154da13d..34e84e90 100644 --- a/tests/test_job_monitor.py +++ b/tests/test_job_monitor.py @@ -39,31 +39,6 @@ def test_initialisation(app): JobMonitorSlurmCERN(app=app) -@pytest.mark.parametrize( - "k8s_phase,k8s_container_state,k8s_logs,pod_logs", - [ - ("Pending", "ErrImagePull", "pull access denied", None), - ("Pending", "InvalidImageName", "couldn't parse image", None), - ("Succeeded", "Completed", None, "job finished"), - ("Failed", "Error", None, "job failed"), - ], -) -def test_kubernetes_get_job_logs( - k8s_phase, k8s_container_state, k8s_logs, pod_logs, app, kubernetes_job_pod -): - """Test retrieval of job logs.""" - k8s_corev1_api_client = mock.MagicMock() - k8s_corev1_api_client.read_namespaced_pod_log = lambda **kwargs: pod_logs - with mock.patch.multiple( - "reana_job_controller.job_monitor", - current_k8s_corev1_api_client=k8s_corev1_api_client, - threading=mock.DEFAULT, - ): - job_monitor_k8s = JobMonitorKubernetes(app=app) - job_pod = kubernetes_job_pod(k8s_phase, k8s_container_state) - assert (k8s_logs or pod_logs) in job_monitor_k8s.get_job_logs(job_pod) - - @pytest.mark.parametrize( "k8s_phase,k8s_container_state,expected_reana_status", [