diff --git a/.gitignore b/.gitignore index aa592830..507ff6e9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,7 @@ dist/ # Installer logs pip-log.txt pip-delete-this-directory.txt +build/ # Sphinx documentation docs/build/ diff --git a/wfcommons/wfchef/wfchef_abstract_recipe.py b/wfcommons/wfchef/wfchef_abstract_recipe.py index 40be9aed..83c61fe1 100644 --- a/wfcommons/wfchef/wfchef_abstract_recipe.py +++ b/wfcommons/wfchef/wfchef_abstract_recipe.py @@ -17,6 +17,7 @@ from logging import Logger from typing import Any, Dict, List, Optional, Set, Union +from wfcommons.common.task import Task from wfcommons.common.workflow import Workflow from wfcommons.wfchef.duplicate import duplicate from wfcommons.wfgen.abstract_recipe import WorkflowRecipe @@ -31,6 +32,7 @@ class BaseMethod(Enum): BIGGEST = 2 RANDOM = 3 + this_dir = pathlib.Path(__file__).resolve().parent @@ -116,11 +118,12 @@ def from_num_tasks(cls, :rtype: WfChefWorkflowRecipe """ - return cls(num_tasks=num_tasks, exclude_graphs=exclude_graphs, runtime_factor=runtime_factor, + return cls(num_tasks=num_tasks, + exclude_graphs=exclude_graphs, + runtime_factor=runtime_factor, input_file_size_factor=input_file_size_factor, output_file_size_factor=output_file_size_factor) - def generate_nx_graph(self) -> nx.DiGraph: summary_path = self.this_dir.joinpath("microstructures", "summary.json") summary = json.loads(summary_path.read_text()) @@ -135,24 +138,24 @@ def generate_nx_graph(self) -> nx.DiGraph: reference_orders = [summary["base_graphs"][col]["order"] for col in df.columns] idx = np.argmin([abs(self.num_tasks - ref_num_tasks) for ref_num_tasks in reference_orders]) reference = df.columns[idx] - + if self.base_method == BaseMethod.ERROR_TABLE: base = df.index[df[reference].argmin()] elif self.base_method == BaseMethod.SMALLEST: - base = min( + base = min( [k for k in summary["base_graphs"].keys() if summary["base_graphs"][k] not in self.exclude_graphs], key=lambda k: summary["base_graphs"][k]["order"] ) elif self.base_method == BaseMethod.BIGGEST: base = max( - [k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and - summary["base_graphs"][k] not in self.exclude_graphs], + [k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and + summary["base_graphs"][k] not in self.exclude_graphs], key=lambda k: summary["base_graphs"][k]["order"] ) else: base = random.choice( - [k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and - summary["base_graphs"][k] not in self.exclude_graphs] + [k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and + summary["base_graphs"][k] not in self.exclude_graphs] ) graph = duplicate(self.this_dir.joinpath("microstructures"), base, self.num_tasks) @@ -167,7 +170,8 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: :return: A synthetic workflow instance object. :rtype: Workflow """ - workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name, makespan=None) + workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name, + makespan=0) graph = self.generate_nx_graph() task_names = {} @@ -181,11 +185,30 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow: task_names[node] = task_name + # tasks dependencies for (src, dst) in graph.edges: if src in ["SRC", "DST"] or dst in ["SRC", "DST"]: continue workflow.add_edge(task_names[src], task_names[dst]) + if task_names[src] not in self.tasks_children: + self.tasks_children[task_names[src]] = [] + if task_names[dst] not in self.tasks_parents: + self.tasks_parents[task_names[dst]] = [] + + self.tasks_children[task_names[src]].append(task_names[dst]) + self.tasks_parents[task_names[dst]].append(task_names[src]) + + # find leaf tasks + leaf_tasks = [] + for node_name in workflow.nodes: + task: Task = workflow.nodes[node_name]['task'] + if task.name not in self.tasks_children: + leaf_tasks.append(task) + + for task in leaf_tasks: + self._generate_task_files(task) + workflow.nxgraph = graph self.workflows.append(workflow) return workflow @@ -195,5 +218,3 @@ def _load_base_graph(self) -> nx.DiGraph: def _load_microstructures(self) -> Dict: return json.loads(self.this_dir.joinpath("microstructures.json").read_text()) - - diff --git a/wfcommons/wfgen/abstract_recipe.py b/wfcommons/wfgen/abstract_recipe.py index 7656346a..fe092b67 100644 --- a/wfcommons/wfgen/abstract_recipe.py +++ b/wfcommons/wfgen/abstract_recipe.py @@ -67,6 +67,9 @@ def __init__(self, name: str, self.tasks_files: Dict[str, List[File]] = {} self.tasks_files_names: Dict[str, List[str]] = {} self.task_id_counter = 1 + self.tasks_map = {} + self.tasks_children = {} + self.tasks_parents = {} @abstractmethod def _workflow_recipe(self) -> Dict[str, Any]: