Skip to content

Commit

Permalink
bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranvasudev committed Dec 4, 2023
1 parent 3c8c92c commit d653977
Showing 1 changed file with 8 additions and 6 deletions.
14 changes: 8 additions & 6 deletions dagger/dag_creator/airflow/dag_creator.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,15 @@ def _create_edge_without_data(self, from_task_id: str, to_task_ids: list, node:
external_task_sensor = self._get_external_task_sensor(
from_task_id, to_task_id, edge_properties.follow_external_dependency
)
self._sensor_dict[to_pipe] = {

if self._sensor_dict.get(to_pipe) is None:
self._sensor_dict[to_pipe] = {}

self._sensor_dict[to_pipe].update({
external_task_sensor_name: external_task_sensor
}
(
self._tasks[self._get_control_flow_task_id(to_pipe)]
>> external_task_sensor
)
})

self._tasks[self._get_control_flow_task_id(to_pipe)] >> external_task_sensor
self._sensor_dict[to_pipe][external_task_sensor_name] >> self._tasks[to_task_id]
else:
self._tasks[self._get_control_flow_task_id(to_pipe)] >> self._tasks[to_task_id]
Expand Down

0 comments on commit d653977

Please sign in to comment.