Skip to content

Commit

Permalink
fix: missing output rewrite handling for nested path arrays
Browse files Browse the repository at this point in the history
  • Loading branch information
davidlougheed committed Mar 12, 2024
1 parent 95bf5b0 commit f74d0df
Showing 1 changed file with 23 additions and 8 deletions.
31 changes: 23 additions & 8 deletions bento_wes/backends/cromwell_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from flask import current_app, json
from pathlib import Path
from typing import TypeVar

from bento_wes.backends import WESBackend
from bento_wes.backends.backend_types import Command
Expand All @@ -14,6 +15,8 @@
]


T = TypeVar("T")

# Spec: https://software.broadinstitute.org/wdl/documentation/spec#whitespace-strings-identifiers-constants
WDL_WORKSPACE_NAME_REGEX = re.compile(r"workflow\s+([a-zA-Z][a-zA-Z0-9_]+)")

Expand Down Expand Up @@ -87,6 +90,21 @@ def _get_command(self, workflow_path: Path, params_path: Path, run_dir: Path) ->
str(workflow_path),
))

@staticmethod
def _rewrite_tmp_dir_paths(v: T, tmp_dir_str: str, output_dir_str: str) -> T:
if isinstance(v, str):
# If we have a file output, it should be a path starting with a prefix like
# /<tmp_dir>/cromwell-executions/... from executing Cromwell with the PWD as /<tmp_dir>/.
# Cromwell outputs the same folder structure in whatever is set for `final_workflow_outputs_dir` in
# _get_command() above, so we can rewrite this prefix to be the output directory instead, since this
# will be preserved after the run is finished:
return output_dir_str + v[len(tmp_dir_str):]
elif isinstance(v, list):
# If we have a list, it may be a nested list of paths, in which case we need to recursively rewrite:
return [CromwellLocalBackend._rewrite_tmp_dir_paths(w, tmp_dir_str, output_dir_str) for w in v]
else:
return v

def get_workflow_outputs(self, run: RunWithDetails) -> dict[str, dict]:
p = self.execute_womtool_command(("outputs", str(self.workflow_path(run))))

Expand All @@ -102,13 +120,10 @@ def get_workflow_outputs(self, run: RunWithDetails) -> dict[str, dict]:
tmp_dir_str = str(self.tmp_dir / "cromwell-executions")
output_dir_str = str(self.output_dir)

outputs_with_type = {}
for k, v in outputs.items():
if isinstance(v, str) and v.startswith(tmp_dir_str):
v = output_dir_str + v[len(tmp_dir_str):]
outputs_with_type[k] = {
return {
k: {
"type": workflow_types[k],
"value": v,
"value": self._rewrite_tmp_dir_paths(v, tmp_dir_str, output_dir_str),
}

return outputs_with_type
for k, v in outputs.items()
}

0 comments on commit f74d0df

Please sign in to comment.