From dcf4793c3d8fb192b7ae3e3248f50445d59e8912 Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Fri, 18 Oct 2024 03:13:18 -0700 Subject: [PATCH] fix(ingest/dbt): Prevent lineage cycles when parsing sql of dbt models (#11666) --- .../ingestion/source/dbt/dbt_common.py | 6 +++ .../tests/unit/test_dbt_source.py | 42 +++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py index 4cd3c934ce6348..c95d0e545c5989 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py @@ -1989,6 +1989,11 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str: time=mce_builder.get_sys_time(), actor=_DEFAULT_ACTOR, ) + sibling_urn = node.get_urn( + self.config.target_platform, + self.config.env, + self.config.target_platform_instance, + ) return UpstreamLineageClass( upstreams=[ UpstreamClass( @@ -1997,6 +2002,7 @@ def _translate_dbt_name_to_upstream_urn(dbt_name: str) -> str: auditStamp=auditStamp, ) for upstream in upstream_urns + if not (node.node_type == "model" and upstream == sibling_urn) ], fineGrainedLineages=( (cll or None) if self.config.include_column_lineage else None diff --git a/metadata-ingestion/tests/unit/test_dbt_source.py b/metadata-ingestion/tests/unit/test_dbt_source.py index 7d01ecd034523d..f0d4c3408271f7 100644 --- a/metadata-ingestion/tests/unit/test_dbt_source.py +++ b/metadata-ingestion/tests/unit/test_dbt_source.py @@ -10,6 +10,7 @@ from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.dbt import dbt_cloud from datahub.ingestion.source.dbt.dbt_cloud import DBTCloudConfig +from datahub.ingestion.source.dbt.dbt_common import DBTNode from datahub.ingestion.source.dbt.dbt_core import ( DBTCoreConfig, DBTCoreSource, @@ -253,6 +254,47 @@ def test_dbt_config_prefer_sql_parser_lineage(): assert config.prefer_sql_parser_lineage is True +def test_dbt_prefer_sql_parser_lineage_no_self_reference(): + ctx = PipelineContext(run_id="test-run-id") + config = DBTCoreConfig.parse_obj( + { + **create_base_dbt_config(), + "skip_sources_in_lineage": True, + "prefer_sql_parser_lineage": True, + } + ) + source: DBTCoreSource = DBTCoreSource(config, ctx, "dbt") + all_nodes_map = { + "model1": DBTNode( + name="model1", + database=None, + schema=None, + alias=None, + comment="", + description="", + language=None, + raw_code=None, + dbt_adapter="postgres", + dbt_name="model1", + dbt_file_path=None, + dbt_package_name=None, + node_type="model", + materialization="table", + max_loaded_at=None, + catalog_type=None, + missing_from_catalog=False, + owner=None, + compiled_code="SELECT d FROM results WHERE d > (SELECT MAX(d) FROM model1)", + ), + } + source._infer_schemas_and_update_cll(all_nodes_map) + upstream_lineage = source._create_lineage_aspect_for_dbt_node( + all_nodes_map["model1"], all_nodes_map + ) + assert upstream_lineage is not None + assert len(upstream_lineage.upstreams) == 1 + + def test_dbt_s3_config(): # test missing aws config config_dict: dict = {