Skip to content

Commit

Permalink
refactor dummy task generation
Browse files Browse the repository at this point in the history
  • Loading branch information
kiranvasudev committed Nov 29, 2023
1 parent ab9a2c8 commit 8ef7050
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions dagger/utilities/dbt_config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,23 @@ def __init__(self, default_config_parameters: dict):
self._sources_in_manifest = self._manifest_data["sources"]

@staticmethod
def _generate_seed_task(seed_node: dict) -> dict:
def _get_dummy_task(node: dict, follow_external_dependency: bool = False) -> dict:
"""
Generates a dummy dagger task for the DBT seed node
Generates a dummy dagger task
Args:
seed_node: The extracted seed node from the manifest.json file
node: The extracted node from the manifest.json file
Returns:
dict: The dummy dagger task for the DBT seed node
dict: The dummy dagger task for the DBT node
"""
task = {}
task["name"] = seed_node.get("name", "")
task["name"] = node.get("name", "")
task["type"] = "dummy"

if follow_external_dependency:
task["follow_external_dependency"] = True

return task

def _get_athena_task(
Expand Down Expand Up @@ -116,6 +119,7 @@ def _generate_dagger_tasks(
) -> List[Dict]:
"""
Generates the dagger task based on whether the DBT model node is a staging model or not.
If the DBT model node represents a DBT seed or an ephemeral model, then a dagger dummy task is generated.
If the DBT model node represents a staging model, then a dagger athena task is generated for each source of the DBT model.
If the DBT model node is not a staging model, then a dagger athena task and an s3 task is generated for the DBT model node itself.
Args:
Expand All @@ -129,14 +133,17 @@ def _generate_dagger_tasks(
dagger_tasks = []

if node.get("resource_type") == "seed":
task = self._generate_seed_task(node)
task = self._get_dummy_task(node)
dagger_tasks.append(task)
elif node.get("config",{}).get("materialized") == "ephemeral":
task = self._get_dummy_task(node, follow_external_dependency=True)
dagger_tasks.append(task)
elif model_name.startswith("stg_"):
source_node_names = node.get("depends_on", {}).get("nodes", [])
for source_node_name in source_node_names:
if source_node_name.startswith("seed"):
source_node = self._nodes_in_manifest[source_node_name]
task = self._generate_seed_task(source_node)
task = self._get_dummy_task(source_node)
else:
source_node = self._sources_in_manifest[source_node_name]
task = self._get_athena_task(
Expand Down

0 comments on commit 8ef7050

Please sign in to comment.