Skip to content

Commit

Permalink
Merge pull request #274 from conductor-sdk/fix/wf-as-code-dynamic-for…
Browse files Browse the repository at this point in the history
…k-issue

FIX - Dynamic Fork (workflow as code)
  • Loading branch information
jmigueprieto authored Jan 14, 2025
2 parents 2d311a1 + 38a154d commit 436585e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 15 deletions.
20 changes: 7 additions & 13 deletions src/conductor/client/workflow/task/dynamic_fork_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,21 @@


class DynamicForkTask(TaskInterface):
def __init__(self, task_ref_name: str, pre_fork_task: TaskInterface, join_task: JoinTask = None) -> Self:
def __init__(self, task_ref_name: str, tasks_param: str = 'dynamicTasks', tasks_input_param_name: str = 'dynamicTasksInputs', join_task: JoinTask = None) -> Self:
super().__init__(
task_reference_name=task_ref_name,
task_type=TaskType.FORK_JOIN_DYNAMIC
)
self._pre_fork_task = deepcopy(pre_fork_task)
self.tasks_param = tasks_param
self.tasks_input_param_name = tasks_input_param_name
self._join_task = deepcopy(join_task)

def to_workflow_task(self) -> WorkflowTask:
workflow = super().to_workflow_task()
workflow.dynamic_fork_join_tasks_param = 'forkedTasks'
workflow.dynamic_fork_tasks_input_param_name = 'forkedTasksInputs'
workflow.input_parameters['forkedTasks'] = self._pre_fork_task.output_ref(
'forkedTasks'
)
workflow.input_parameters['forkedTasksInputs'] = self._pre_fork_task.output_ref(
'forkedTasksInputs'
)
wf_task = super().to_workflow_task()
wf_task.dynamic_fork_join_tasks_param = self.tasks_param
wf_task.dynamic_fork_tasks_input_param_name = self.tasks_input_param_name
tasks = [
self._pre_fork_task.to_workflow_task(),
workflow,
wf_task,
]
if self._join_task != None:
tasks.append(self._join_task.to_workflow_task())
Expand Down
8 changes: 7 additions & 1 deletion src/conductor/client/workflow/task/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,13 @@
def get_task_interface_list_as_workflow_task_list(*tasks: Self) -> List[WorkflowTask]:
converted_tasks = []
for task in tasks:
converted_tasks.append(task.to_workflow_task())
wf_task = task.to_workflow_task()
if isinstance(wf_task, list):
# to_workflow_task() returned a list. E.g.: DynamicFork.to_workflow_task() returns the DynamicFork and the Join task.
for t in wf_task:
converted_tasks.append(t)
else:
converted_tasks.append(task.to_workflow_task())
return converted_tasks


Expand Down
1 change: 0 additions & 1 deletion tests/integration/metadata/test_workflow_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ def generate_set_variable_task() -> SetVariableTask:
def generate_dynamic_fork_task() -> DynamicForkTask:
return DynamicForkTask(
task_ref_name='dynamic_fork',
pre_fork_task=generate_simple_task(10),
join_task=JoinTask(
'join', join_on=[]
),
Expand Down

0 comments on commit 436585e

Please sign in to comment.