From 48a102fcecfe3dda9c5e280e33ab081f4319f4f1 Mon Sep 17 00:00:00 2001 From: Kiran Vasudev Date: Mon, 4 Dec 2023 19:16:58 +0100 Subject: [PATCH 1/2] only return dummy when stg model --- dagger/utilities/dbt_config_parser.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/dagger/utilities/dbt_config_parser.py b/dagger/utilities/dbt_config_parser.py index 6c80cb4..dd16008 100644 --- a/dagger/utilities/dbt_config_parser.py +++ b/dagger/utilities/dbt_config_parser.py @@ -143,18 +143,14 @@ def _generate_dagger_tasks( if node.get("resource_type") == "seed": task = self._get_dummy_task(node) dagger_tasks.append(task) - elif node.get("resource_type") == 'source': + elif node.get("resource_type") == "source": athena_task = self._get_athena_task(node, follow_external_dependency=True) dagger_tasks.append(athena_task) elif node.get("config", {}).get("materialized") == "ephemeral": task = self._get_dummy_task(node, follow_external_dependency=True) dagger_tasks.append(task) elif node.get("name").startswith("stg_"): - source_node_names = node.get("depends_on", {}).get("nodes", []) dagger_tasks.append(self._get_dummy_task(node)) - for source_node_name in source_node_names: - task = self._generate_dagger_tasks(source_node_name) - dagger_tasks.extend(task) else: athena_task = self._get_athena_task(node, follow_external_dependency=True) s3_task = self._get_s3_task(node) From f6c7226606956e74e91cb19e58207f8897fe97a1 Mon Sep 17 00:00:00 2001 From: Kiran Vasudev Date: Mon, 4 Dec 2023 19:19:37 +0100 Subject: [PATCH 2/2] adapted tests --- .../modules/dbt_config_parser_fixtures.py | 68 ++++--------------- tests/utilities/test_dbt_config_parser.py | 12 ++-- 2 files changed, 20 insertions(+), 60 deletions(-) diff --git a/tests/fixtures/modules/dbt_config_parser_fixtures.py b/tests/fixtures/modules/dbt_config_parser_fixtures.py index 432b2a3..2f42778 100644 --- a/tests/fixtures/modules/dbt_config_parser_fixtures.py +++ b/tests/fixtures/modules/dbt_config_parser_fixtures.py @@ -165,34 +165,6 @@ EXPECTED_STAGING_NODE = [ {"name": "stg_core_schema1__table1", "type": "dummy"}, - { - "type": "athena", - "name": "core_schema1__table1_athena", - "schema": "core_schema1", - "table": "table1", - "follow_external_dependency": True, - }, -] -EXPECTED_STAGING_NODE_MULTIPLE_DEPENDENCIES = [ - {"name": "stg_core_schema2__table2", "type": "dummy"}, - { - "type": "athena", - "name": "core_schema2__table2_athena", - "schema": "core_schema2", - "table": "table2", - "follow_external_dependency": True, - }, - { - "type": "athena", - "name": "core_schema2__table3_athena", - "schema": "core_schema2", - "table": "table3", - "follow_external_dependency": True, - }, - { - "type": "dummy", - "name": "seed_buyer_country_overwrite", - }, ] EXPECTED_SEED_NODE = [ @@ -225,21 +197,7 @@ "type": "dummy", "name": "seed_buyer_country_overwrite", }, - {"name": "stg_core_schema2__table2", "type": "dummy"}, - { - "type": "athena", - "name": "core_schema2__table2_athena", - "schema": "core_schema2", - "table": "table2", - "follow_external_dependency": True, - }, - { - "type": "athena", - "name": "core_schema2__table3_athena", - "schema": "core_schema2", - "table": "table3", - "follow_external_dependency": True, - }, + {"name": "stg_core_schema2__table2", "type": "dummy"} ] EXPECTED_EPHEMERAL_NODE = [ @@ -250,23 +208,24 @@ } ] -EXPECTED_DAGGER_INPUTS = [ - {"name": "stg_core_schema2__table2", "type": "dummy"}, +EXPECTED_MODEL_NODE = [ { - "name": "core_schema2__table2_athena", - "schema": "core_schema2", - "table": "table2", "type": "athena", + "name": "analytics_engineering__model1_athena", + "schema": "analytics_engineering", + "table": "model1", "follow_external_dependency": True, }, { - "name": "core_schema2__table3_athena", - "schema": "core_schema2", - "table": "table3", - "type": "athena", - "follow_external_dependency": True, + "bucket": "bucket1-data-lake", + "name": "analytics_engineering__model1_s3", + "path": "path1/model1", + "type": "s3", }, - {"name": "seed_buyer_country_overwrite", "type": "dummy"}, +] + +EXPECTED_DAGGER_INPUTS = [ + {"name": "stg_core_schema2__table2", "type": "dummy"}, { "name": "analytics_engineering__model2_athena", "schema": "analytics_engineering", @@ -285,6 +244,7 @@ "name": "int_model3", "follow_external_dependency": True, }, + {"name": "seed_buyer_country_overwrite", "type": "dummy"}, ] EXPECTED_DBT_STAGING_MODEL_DAGGER_INPUTS = [ diff --git a/tests/utilities/test_dbt_config_parser.py b/tests/utilities/test_dbt_config_parser.py index be9b3dc..c03976d 100644 --- a/tests/utilities/test_dbt_config_parser.py +++ b/tests/utilities/test_dbt_config_parser.py @@ -11,12 +11,12 @@ DBT_MANIFEST_FILE_FIXTURE, DBT_PROFILE_FIXTURE, EXPECTED_STAGING_NODE, - EXPECTED_STAGING_NODE_MULTIPLE_DEPENDENCIES, EXPECTED_SEED_NODE, EXPECTED_MODEL_MULTIPLE_DEPENDENCIES, EXPECTED_EPHEMERAL_NODE, EXPECTED_DBT_STAGING_MODEL_DAGGER_OUTPUTS, EXPECTED_DBT_STAGING_MODEL_DAGGER_INPUTS, + EXPECTED_MODEL_NODE, ) _logger = logging.getLogger("root") @@ -47,16 +47,12 @@ def test_generate_task_configs(self): module.generate_task_configs() - def test_generate_dagger_inputs(self): + def test_generate_dagger_tasks(self): test_inputs = [ ( "model.main.stg_core_schema1__table1", EXPECTED_STAGING_NODE, ), - ( - "model.main.stg_core_schema2__table2", - EXPECTED_STAGING_NODE_MULTIPLE_DEPENDENCIES, - ), ( "seed.main.seed_buyer_country_overwrite", EXPECTED_SEED_NODE, @@ -65,6 +61,10 @@ def test_generate_dagger_inputs(self): "model.main.int_model3", EXPECTED_EPHEMERAL_NODE, ), + ( + "model.main.model1", + EXPECTED_MODEL_NODE, + ), ] for mock_input, expected_output in test_inputs: result = self._dbt_config_parser._generate_dagger_tasks(mock_input)