Skip to content

Commit

Permalink
#17: fixing data dependency issue
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelfsilva committed Aug 23, 2021
1 parent 564f732 commit 26c8552
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dist/
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
build/

# Sphinx documentation
docs/build/
Expand Down
43 changes: 32 additions & 11 deletions wfcommons/wfchef/wfchef_abstract_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from logging import Logger
from typing import Any, Dict, List, Optional, Set, Union
from wfcommons.common.task import Task
from wfcommons.common.workflow import Workflow
from wfcommons.wfchef.duplicate import duplicate
from wfcommons.wfgen.abstract_recipe import WorkflowRecipe
Expand All @@ -31,6 +32,7 @@ class BaseMethod(Enum):
BIGGEST = 2
RANDOM = 3


this_dir = pathlib.Path(__file__).resolve().parent


Expand Down Expand Up @@ -116,11 +118,12 @@ def from_num_tasks(cls,
:rtype: WfChefWorkflowRecipe
"""
return cls(num_tasks=num_tasks, exclude_graphs=exclude_graphs, runtime_factor=runtime_factor,
return cls(num_tasks=num_tasks,
exclude_graphs=exclude_graphs,
runtime_factor=runtime_factor,
input_file_size_factor=input_file_size_factor,
output_file_size_factor=output_file_size_factor)


def generate_nx_graph(self) -> nx.DiGraph:
summary_path = self.this_dir.joinpath("microstructures", "summary.json")
summary = json.loads(summary_path.read_text())
Expand All @@ -135,24 +138,24 @@ def generate_nx_graph(self) -> nx.DiGraph:
reference_orders = [summary["base_graphs"][col]["order"] for col in df.columns]
idx = np.argmin([abs(self.num_tasks - ref_num_tasks) for ref_num_tasks in reference_orders])
reference = df.columns[idx]

if self.base_method == BaseMethod.ERROR_TABLE:
base = df.index[df[reference].argmin()]
elif self.base_method == BaseMethod.SMALLEST:
base = min(
base = min(
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k] not in self.exclude_graphs],
key=lambda k: summary["base_graphs"][k]["order"]
)
elif self.base_method == BaseMethod.BIGGEST:
base = max(
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and
summary["base_graphs"][k] not in self.exclude_graphs],
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and
summary["base_graphs"][k] not in self.exclude_graphs],
key=lambda k: summary["base_graphs"][k]["order"]
)
else:
base = random.choice(
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and
summary["base_graphs"][k] not in self.exclude_graphs]
[k for k in summary["base_graphs"].keys() if summary["base_graphs"][k]["order"] <= self.num_tasks and
summary["base_graphs"][k] not in self.exclude_graphs]
)

graph = duplicate(self.this_dir.joinpath("microstructures"), base, self.num_tasks)
Expand All @@ -167,7 +170,8 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:
:return: A synthetic workflow instance object.
:rtype: Workflow
"""
workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name, makespan=None)
workflow = Workflow(name=self.name + "-synthetic-instance" if not workflow_name else workflow_name,
makespan=0)
graph = self.generate_nx_graph()

task_names = {}
Expand All @@ -181,11 +185,30 @@ def build_workflow(self, workflow_name: Optional[str] = None) -> Workflow:

task_names[node] = task_name

# tasks dependencies
for (src, dst) in graph.edges:
if src in ["SRC", "DST"] or dst in ["SRC", "DST"]:
continue
workflow.add_edge(task_names[src], task_names[dst])

if task_names[src] not in self.tasks_children:
self.tasks_children[task_names[src]] = []
if task_names[dst] not in self.tasks_parents:
self.tasks_parents[task_names[dst]] = []

self.tasks_children[task_names[src]].append(task_names[dst])
self.tasks_parents[task_names[dst]].append(task_names[src])

# find leaf tasks
leaf_tasks = []
for node_name in workflow.nodes:
task: Task = workflow.nodes[node_name]['task']
if task.name not in self.tasks_children:
leaf_tasks.append(task)

for task in leaf_tasks:
self._generate_task_files(task)

workflow.nxgraph = graph
self.workflows.append(workflow)
return workflow
Expand All @@ -195,5 +218,3 @@ def _load_base_graph(self) -> nx.DiGraph:

def _load_microstructures(self) -> Dict:
return json.loads(self.this_dir.joinpath("microstructures.json").read_text())


3 changes: 3 additions & 0 deletions wfcommons/wfgen/abstract_recipe.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ def __init__(self, name: str,
self.tasks_files: Dict[str, List[File]] = {}
self.tasks_files_names: Dict[str, List[str]] = {}
self.task_id_counter = 1
self.tasks_map = {}
self.tasks_children = {}
self.tasks_parents = {}

@abstractmethod
def _workflow_recipe(self) -> Dict[str, Any]:
Expand Down

0 comments on commit 26c8552

Please sign in to comment.