Skip to content

Commit

Permalink
feat: start implementing injectable config + new bento_lib/authz stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Oct 23, 2023
1 parent be8e08e commit 217974d
Show file tree
Hide file tree
Showing 10 changed files with 157 additions and 174 deletions.
9 changes: 3 additions & 6 deletions bento_wes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,11 @@ def service_info():

try:
if res_tag := subprocess.check_output(["git", "describe", "--tags", "--abbrev=0"]):
res_tag_str = res_tag.decode().rstrip()
info["bento"]["gitTag"] = res_tag_str
info["bento"]["gitTag"] = res_tag.decode().rstrip()
if res_branch := subprocess.check_output(["git", "branch", "--show-current"]):
res_branch_str = res_branch.decode().rstrip()
info["bento"]["gitBranch"] = res_branch_str
info["bento"]["gitBranch"] = res_branch.decode().rstrip()
if res_commit := subprocess.check_output(["git", "rev-parse", "HEAD"]):
res_commit_str = res_commit.decode().rstrip()
info["bento"]["gitCommit"] = res_commit_str
info["bento"]["gitCommit"] = res_commit.decode().rstrip()
except Exception as e:
except_name = type(e).__name__
current_app.logger.info(f"Could not retrieve git information: {str(except_name)}: {e}")
Expand Down
5 changes: 0 additions & 5 deletions bento_wes/authz.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,10 @@

__all__ = [
"authz_middleware",
"PERMISSION_INGEST_DATA",
"PERMISSION_VIEW_RUNS",
]

authz_middleware = FlaskAuthMiddleware(
config.AUTHZ_URL,
debug_mode=config.BENTO_DEBUG,
enabled=config.AUTHZ_ENABLED,
)

PERMISSION_INGEST_DATA = "ingest:data"
PERMISSION_VIEW_RUNS = "view:runs"
132 changes: 54 additions & 78 deletions bento_wes/backends/_wes_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
from abc import ABC, abstractmethod
from bento_lib.events import EventBus
from bento_lib.events.types import EVENT_WES_RUN_FINISHED
from bento_lib.workflows.models import WorkflowSecretInput
from bento_lib.workflows.utils import namespaced_input
from flask import current_app
from typing import Any

from bento_wes import states
from bento_wes.constants import SERVICE_ARTIFACT, RUN_PARAM_FROM_CONFIG
from bento_wes.constants import SERVICE_ARTIFACT
from bento_wes.db import get_db, finish_run, update_run_state_and_commit
from bento_wes.models import Run, RunWithDetails, BentoWorkflowInputWithValue
from bento_wes.models import Run, RunWithDetails
from bento_wes.states import STATE_EXECUTOR_ERROR, STATE_SYSTEM_ERROR
from bento_wes.utils import iso_now
from bento_wes.workflows import WorkflowType, WorkflowManager
Expand All @@ -28,13 +30,8 @@
# Spec: https://software.broadinstitute.org/wdl/documentation/spec#whitespace-strings-identifiers-constants
WDL_WORKSPACE_NAME_REGEX = re.compile(r"workflow\s+([a-zA-Z][a-zA-Z0-9_]+)")

PARAM_SECRET_PREFIX = "secret__"
ParamDict = dict[str, str | int | float | bool]

FORBIDDEN_FROM_CONFIG_PARAMS = (
"WES_CLIENT_SECRET",
)


class WESBackend(ABC):
def __init__(
Expand Down Expand Up @@ -305,126 +302,103 @@ def _initialize_run_and_get_command(
self,
run: RunWithDetails,
celery_id: int,
access_token: str,
secrets: dict[str, str],
) -> tuple[Command, dict] | None:
"""
Performs "initialization" operations on the run, including setting states, downloading and validating the
workflow file, and generating and logging the workflow-running command.
:param run: The run to initialize
:param celery_id: The Celery ID of the Celery task responsible for executing the run
:param access_token: An access token for talking with this Bento instance's services
:param secrets: A dictionary of secrets (e.g., tokens) to be injected as parameters (potentially) but not stored
in the database.
:return: The command to execute, if no errors occurred; None otherwise
"""

self._update_run_state_and_commit(run.run_id, states.STATE_INITIALIZING)

run_dir = self.run_dir(run)

# -- Check that the run directory exists ------------------------------
# -- Check that the run directory exists -----------------------------------------------------------------------
if not os.path.exists(run_dir):
# TODO: Log error in run log
self.log_error("Run directory not found")
return self._finish_run_and_clean_up(run, states.STATE_SYSTEM_ERROR)

c = self.db.cursor()
run_req = run.request

workflow_id = run.request.tags.workflow_id
workflow_params: ParamDict = {
**run.request.workflow_params,

# TODO: only if requested in inputs
f"{workflow_id}.{PARAM_SECRET_PREFIX}access_token": access_token,

# In export/analysis mode, as we rely on services located in different containers
# there is a need to have designated folders on shared volumes between
# WES and the other services, to write files to.
# This is possible because /wes/tmp is a volume mounted with the same
# path in each data service (except Gohan which mounts the dropbox
# data-x directory directly instead, to avoid massive duplicates).
# Also, the Toil library creates directories in the generic /tmp/
# directory instead of the one that is configured for the job execution,
# to create the current working directories where tasks are executed.
# These files are inaccessible to other containers in the context of a
# task unless they are written arbitrarily to run_dir
# TODO: only if requested in inputs
f"{workflow_id}.run_dir": run_dir,

# TODO: more special parameters: service URLs, system__run_dir...
}

# Some workflow parameters depend on the WES application configuration
# and need to be added from there.
# The reserved keyword `FROM_CONFIG` is used to detect those inputs.
# All parameters in config are upper case. e.g. drs_url --> DRS_URL
for i in run.request.tags.workflow_metadata.inputs:
self.log_debug(f"Found workflow input for run {run.run_id}: {str(i)}")
if not isinstance(i, BentoWorkflowInputWithValue) or i.value != RUN_PARAM_FROM_CONFIG:
continue
input_id_upper = i.id.upper()
if input_id_upper in FORBIDDEN_FROM_CONFIG_PARAMS: # Cannot grab WES client secret, for example
self.log_warning(f"Cannot inject forbidden WES config item: {i.id}")
continue
param_val = current_app.config.get(input_id_upper, "")
workflow_params[f"{workflow_id}.{i.id}"] = param_val
self.log_debug(f"Injecting FROM_CONFIG param to {workflow_id}: {i.id}={param_val}")

# -- Validate the workflow --------------------------------------------
# run_req.workflow_params now includes non-secret injected values since it was read from the database after
# the run ID was passed to the runner:
workflow_params_with_secrets: ParamDict = {**run_req.workflow_params}

# -- Find which inputs are secrets, which need to be injected here (so they don't end up in the database) ------
for run_input in run_req.tags.workflow_metadata.inputs:
if isinstance(run_input, WorkflowSecretInput):
secret_value = secrets.get(run_input.key)
if secret_value is None:
err = f"Could not find injectable secret for key {run_input.key}"
self.log_error(err)
return self._finish_run_and_clean_up(run, STATE_EXECUTOR_ERROR)
workflow_params_with_secrets[namespaced_input(run_req.tags.workflow_id, run_input.id)] = secret_value

# -- Validate the workflow -------------------------------------------------------------------------------------
error = self._check_workflow_and_type(run)
if error is not None:
self.log_error(error[0])
return self._finish_run_and_clean_up(run, error[1])

# -- Find "real" workflow name from workflow file ---------------------
# -- Find "real" workflow name from workflow file --------------------------------------------------------------
workflow_name = self.get_workflow_name(self.workflow_path(run))
if workflow_name is None:
# Invalid/non-workflow-specifying workflow file
self.log_error("Could not find workflow name in workflow file")
return self._finish_run_and_clean_up(run, states.STATE_SYSTEM_ERROR)

c = self.db.cursor()

# TODO: To avoid having multiple names, we should maybe only set this once?
c.execute("UPDATE runs SET run_log__name = ? WHERE id = ?", (workflow_name, run.run_id))
self.db.commit()

# -- Store input for the workflow in a file in the temporary folder ---
# -- Store input for the workflow in a file in the temporary folder --------------------------------------------
with open(self._params_path(run), "w") as pf:
pf.write(self._serialize_params(workflow_params))
pf.write(self._serialize_params(workflow_params_with_secrets))

# -- Create the runner command based on inputs ------------------------
# -- Create the runner command based on inputs -----------------------------------------------------------------
cmd = self._get_command(self.workflow_path(run), self._params_path(run), self.run_dir(run))

# -- Update run log with command and Celery ID ------------------------
# -- Update run log with command and Celery ID -----------------------------------------------------------------
c.execute(
"UPDATE runs SET run_log__cmd = ?, run_log__celery_id = ? WHERE id = ?",
(" ".join(cmd), celery_id, run.run_id))
self.db.commit()

return cmd, workflow_params
return cmd, workflow_params_with_secrets

@abstractmethod
def get_workflow_outputs(self, run_dir: str) -> dict[str, Any]:
pass

def _perform_run(self, run: RunWithDetails, cmd: Command, params_with_extras: ParamDict) -> ProcessResult | None:
def _perform_run(self, run: RunWithDetails, cmd: Command, params_with_secrets: ParamDict) -> ProcessResult | None:
"""
Performs a run based on a provided command and returns stdout, stderr, exit code, and whether the process timed
out while running.
:param run: The run to execute
:param cmd: The command used to execute the run
:param params_with_extras: A dictionary of parameters, including secret values
:param params_with_secrets: A dictionary of parameters, including secret values
:return: A ProcessResult tuple of (stdout, stderr, exit_code, timed_out)
"""

c = self.db.cursor()

# Perform run =========================================================
# Perform run ==================================================================================================

# -- Start process running the generated command ----------------------
# -- Start process running the generated command ---------------------------------------------------------------
runner_process = subprocess.Popen(
cmd, cwd=self.tmp_dir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8")
c.execute("UPDATE runs SET run_log__start_time = ? WHERE id = ?", (iso_now(), run.run_id))
self._update_run_state_and_commit(run.run_id, states.STATE_RUNNING)

# -- Wait for and capture output --------------------------------------
# -- Wait for and capture output -------------------------------------------------------------------------------

timed_out = False

Expand All @@ -439,18 +413,19 @@ def _perform_run(self, run: RunWithDetails, cmd: Command, params_with_extras: Pa
finally:
exit_code = runner_process.returncode

# -- Censor output in case it includes any secrets
# -- Censor output in case it includes any secrets -------------------------------------------------------------

for k, v in params_with_extras.items():
if PARAM_SECRET_PREFIX not in k:
continue
if isinstance(v, str) and len(v) >= 5: # redacted secrets must be somewhat lengthy
workflow_id = run.request.tags.workflow_id
req_secret_inputs = (i for i in run.request.tags.workflow_metadata.inputs if isinstance(i, WorkflowSecretInput))
for req_secret_input in req_secret_inputs:
v = params_with_secrets.get(namespaced_input(workflow_id, req_secret_input.id))
if isinstance(v, str) and len(v) > 1: # don't "censor" blank strings/single characters
stdout = stdout.replace(v, "<redacted>")
stderr = stderr.replace(v, "<redacted>")

# Complete run ========================================================
# Complete run =================================================================================================

# -- Update run log with stdout/stderr, exit code ---------------------
# -- Update run log with stdout/stderr, exit code --------------------------------------------------------------
# - Explicitly don't commit here; sync with state update
c.execute("UPDATE runs SET run_log__stdout = ?, run_log__stderr = ?, run_log__exit_code = ? WHERE id = ?",
(stdout, stderr, exit_code, run.run_id))
Expand All @@ -460,7 +435,7 @@ def _perform_run(self, run: RunWithDetails, cmd: Command, params_with_extras: Pa
self.log_error("Encountered timeout while performing run")
return self._finish_run_and_clean_up(run, states.STATE_SYSTEM_ERROR)

# -- Final steps: check exit code and report results ------------------
# -- Final steps: check exit code and report results -----------------------------------------------------------

if exit_code != 0:
# TODO: Report error somehow
Expand Down Expand Up @@ -495,12 +470,13 @@ def _perform_run(self, run: RunWithDetails, cmd: Command, params_with_extras: Pa

return ProcessResult((stdout, stderr, exit_code, timed_out))

def perform_run(self, run: RunWithDetails, celery_id: int, access_token: str) -> ProcessResult | None:
def perform_run(self, run: RunWithDetails, celery_id: int, secrets: dict[str, str]) -> ProcessResult | None:
"""
Executes a run from start to finish (initialization, startup, and completion / cleanup.)
:param run: The run to execute
:param celery_id: The ID of the Celery task responsible for executing the workflow
:param access_token: An access token for talking with this Bento instance's services
:param secrets: A dictionary of secrets (e.g., tokens) to be injected as parameters (potentially) but not stored
in the database.
:return: A ProcessResult tuple of (stdout, stderr, exit_code, timed_out)
"""

Expand All @@ -511,12 +487,12 @@ def perform_run(self, run: RunWithDetails, celery_id: int, access_token: str) ->

self._runs[run.run_id] = run

# Initialization (loading / downloading files) ------------------------
init_vals = self._initialize_run_and_get_command(run, celery_id, access_token)
# Initialization (loading / downloading files + secrets injection) ---------------------------------------------
init_vals = self._initialize_run_and_get_command(run, celery_id, secrets)
if init_vals is None:
return

cmd, params_with_extras = init_vals
cmd, params_with_secrets = init_vals

# Perform, finish, and clean up run -----------------------------------
return self._perform_run(run, cmd, params_with_extras)
# Perform, finish, and clean up run ----------------------------------------------------------------------------
return self._perform_run(run, cmd, params_with_secrets)
22 changes: 8 additions & 14 deletions bento_wes/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,16 @@ def _get_from_environ_or_fail(var: str) -> str:
return val


def _to_bool(val: str) -> bool:
return val.strip().lower() in TRUTH_VALUES


TRUTH_VALUES = ("true", "1")

AUTHZ_ENABLED = os.environ.get("AUTHZ_ENABLED", "true").strip().lower() in TRUTH_VALUES

BENTO_DEBUG: bool = os.environ.get(
"BENTO_DEBUG",
os.environ.get("FLASK_DEBUG", "false")
).strip().lower() in TRUTH_VALUES

CELERY_DEBUG: bool = os.environ.get(
"CELERY_DEBUG", ""
).strip().lower() in TRUTH_VALUES
BENTO_DEBUG: bool = _to_bool(os.environ.get("BENTO_DEBUG", os.environ.get("FLASK_DEBUG", "false")))
CELERY_DEBUG: bool = _to_bool(os.environ.get("CELERY_DEBUG", ""))

AUTHZ_URL: str = _get_from_environ_or_fail("BENTO_AUTHZ_SERVICE_URL").strip().rstrip("/")
SERVICE_REGISTRY_URL: str = _get_from_environ_or_fail("SERVICE_REGISTRY_URL").strip().rstrip("/")
Expand All @@ -47,8 +45,7 @@ class Config:
BENTO_URL: str = os.environ.get("BENTO_URL", "http://127.0.0.1:5000/")

BENTO_DEBUG: bool = BENTO_DEBUG
BENTO_VALIDATE_SSL: bool = os.environ.get(
"BENTO_VALIDATE_SSL", str(not BENTO_DEBUG)).strip().lower() in TRUTH_VALUES
BENTO_VALIDATE_SSL: bool = _to_bool(os.environ.get("BENTO_VALIDATE_SSL", str(not BENTO_DEBUG)))

DATABASE: str = os.environ.get("DATABASE", "bento_wes.db")
SERVICE_ID = SERVICE_ID
Expand All @@ -74,10 +71,7 @@ class Config:
WES_CLIENT_ID: str = os.environ.get("WES_CLIENT_ID", "bento_wes")
WES_CLIENT_SECRET: str = os.environ.get("WES_CLIENT_SECRET", "")

# Other services, used for interpolating workflow variables and (
DRS_URL: str = os.environ.get("DRS_URL", "").strip().rstrip("/")
GOHAN_URL: str = os.environ.get("GOHAN_URL", "").strip().rstrip("/")
KATSU_URL: str = os.environ.get("KATSU_URL", "").strip().rstrip("/")
# Service registry URL, used for looking up service kinds to inject as workflow input
SERVICE_REGISTRY_URL: str = SERVICE_REGISTRY_URL

# VEP-related configuration
Expand Down
3 changes: 0 additions & 3 deletions bento_wes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
"SERVICE_TYPE",
"SERVICE_ID",
"SERVICE_NAME",
"RUN_PARAM_FROM_CONFIG",
]

BENTO_SERVICE_KIND = "wes"
Expand All @@ -22,5 +21,3 @@
}
SERVICE_ID = os.environ.get("SERVICE_ID", ":".join(SERVICE_TYPE.values()))
SERVICE_NAME = "Bento WES"

RUN_PARAM_FROM_CONFIG: Literal["FROM_CONFIG"] = "FROM_CONFIG"
Loading

0 comments on commit 217974d

Please sign in to comment.