Skip to content

Commit

Permalink
externalbackend: add kubernetes_job_timeout parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladyslav Moisieienkov committed Dec 20, 2021
1 parent 601477f commit ac194f6
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 57 deletions.
1 change: 1 addition & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Changes
Version 0.8.1 (UNRELEASED)
---------------------------

- Adds support for specifying ``kubernetes_job_timeout`` for Kubernetes compute backend jobs.
- Fixes workflow stuck in pending status due to early Yadage fail.

Version 0.8.0 (2021-11-22)
Expand Down
89 changes: 32 additions & 57 deletions reana_workflow_engine_yadage/externalbackend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2017-2021 CERN.
#
Expand All @@ -10,6 +8,7 @@
import base64
import logging
import os
from typing import Any, Dict, List, Union

from packtivity.asyncbackends import ExternalAsyncProxy
from packtivity.syncbackends import build_job, finalize_inputs, packconfig, publish
Expand Down Expand Up @@ -56,13 +55,42 @@ def __init__(self):
self.jobs_statuses = {}
self._fail_info = ""

@staticmethod
def _get_resources(resources: List[Union[Dict, Any]]) -> Dict[str, Any]:
parameters = {}

def set_parameter(resource: Dict[str, Any], key: str) -> None:
if key in resource and resource[key] is not None:
parameters[key] = resource[key]

for item in resources:
if not isinstance(item, dict):
log.info(
"REANA only supports dictionary entries for resources. "
f'"{item}" value is not formatted in such a way and will be ignored.'
)
continue
set_parameter(item, "kerberos")
set_parameter(item, "compute_backend")
set_parameter(item, "kubernetes_uid")
set_parameter(item, "kubernetes_memory_limit")
set_parameter(item, "kubernetes_job_timeout")
set_parameter(item, "unpacked_img")
set_parameter(item, "voms_proxy")
set_parameter(item, "htcondor_max_runtime")
set_parameter(item, "htcondor_accounting_group")
return parameters

def submit( # noqa: C901
self, spec, parameters, state, metadata # noqa: C901
) -> ReanaExternalProxy: # noqa: C901
"""Submit a yadage packtivity to RJC."""
parameters, state = finalize_inputs(parameters, state)
job = build_job(spec["process"], parameters, state, self.config)

log.debug(f"state context is {state}")
state.ensure()

if "command" in job:
prettified_cmd = wrapped_cmd = job["command"]
elif "script" in job:
Expand All @@ -74,46 +102,11 @@ def submit( # noqa: C901
if imagetag:
image = f"{image}:{imagetag}"

kerberos = None
compute_backend = None
kubernetes_uid = None
kubernetes_memory_limit = None
unpacked_img = None
voms_proxy = None
htcondor_max_runtime = None
htcondor_accounting_group = None

resources = spec["environment"].get("resources", [])
for item in resources:
if not isinstance(item, dict):
log.info(
'REANA only supports dictionary entries for resources. "{0}" value is not formatted in such a way and will be ignored.'.format(
item
)
)
continue
if "kerberos" in item.keys():
kerberos = item["kerberos"]
if "compute_backend" in item.keys():
compute_backend = item["compute_backend"]
if "kubernetes_uid" in item.keys():
kubernetes_uid = item["kubernetes_uid"]
if "kubernetes_memory_limit" in item.keys():
kubernetes_memory_limit = item["kubernetes_memory_limit"]
if "unpacked_img" in item.keys():
unpacked_img = item["unpacked_img"]
if "voms_proxy" in item.keys():
voms_proxy = item["voms_proxy"]
if "htcondor_max_runtime" in item.keys():
htcondor_max_runtime = item["htcondor_max_runtime"]
if "htcondor_accounting_group" in item.keys():
htcondor_accounting_group = item["htcondor_accounting_group"]
resources_parameters = self._get_resources(resources)

log.debug(f"state context is {state}")
log.debug(f"would run job {job}")

state.ensure()

workflow_uuid = os.getenv("workflow_uuid", "default")
job_request_body = {
"workflow_uuid": workflow_uuid,
Expand All @@ -123,27 +116,9 @@ def submit( # noqa: C901
"workflow_workspace": os.getenv("workflow_workspace", "default"),
"job_name": metadata["name"],
"cvmfs_mounts": MOUNT_CVMFS,
**resources_parameters,
}

if compute_backend:
job_request_body["compute_backend"] = compute_backend
if kerberos:
job_request_body["kerberos"] = kerberos
if kubernetes_uid:
job_request_body["kubernetes_uid"] = kubernetes_uid
if kubernetes_memory_limit:
job_request_body["kubernetes_memory_limit"] = kubernetes_memory_limit
if unpacked_img:
job_request_body["unpacked_img"] = unpacked_img
if voms_proxy:
job_request_body["voms_proxy"] = voms_proxy
if htcondor_max_runtime:
job_request_body["htcondor_max_runtime"] = htcondor_max_runtime
if htcondor_accounting_group:
job_request_body["htcondor_accounting_group"] = htcondor_accounting_group

log.debug("Submitting job")

job_submit_response = self.rjc_api_client.submit(**job_request_body)
job_id = job_submit_response.get("job_id")

Expand Down
38 changes: 38 additions & 0 deletions tests/test_externalbackend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# This file is part of REANA.
# Copyright (C) 2021 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""REANA-Workflow-Engine-Yadage ExternalBackend tests."""

from typing import List, Dict, Any, Union

import pytest


class TestExternalBackend:
@pytest.mark.parametrize(
"input_parameters,final_parameters",
[
(
[
{"compute_backend": "kubernetes"},
{"not_exists": "value"},
{"kubernetes_job_timeout": 20},
{"kubernetes_memory_limit": None},
],
{"compute_backend": "kubernetes", "kubernetes_job_timeout": 20},
),
(
[{"kubernetes_job_timeout": 10}, {"kubernetes_job_timeout": 30}],
{"kubernetes_job_timeout": 30},
),
],
)
def test_get_resources(
self, input_parameters: List[Union[Dict, Any]], final_parameters: Dict[str, Any]
):
from reana_workflow_engine_yadage.externalbackend import ExternalBackend

assert ExternalBackend._get_resources(input_parameters) == final_parameters

0 comments on commit ac194f6

Please sign in to comment.