Skip to content

Commit

Permalink
feat(ingest/dbt): skip CLL on sources with skip_sources_in_lineage (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Aug 19, 2024
1 parent 291fc41 commit bb8cf97
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 786 deletions.
37 changes: 23 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -770,23 +770,30 @@ def make_mapping_upstream_lineage(
downstream_urn: str,
node: DBTNode,
convert_column_urns_to_lowercase: bool,
skip_sources_in_lineage: bool,
) -> UpstreamLineageClass:
cll = []
for column in node.columns or []:
field_name = column.name
if convert_column_urns_to_lowercase:
field_name = field_name.lower()

cll.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[mce_builder.make_schema_field_urn(upstream_urn, field_name)],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
mce_builder.make_schema_field_urn(downstream_urn, field_name)
],
if not (node.node_type == "source" and skip_sources_in_lineage):
# If `skip_sources_in_lineage` is enabled, we want to generate table lineage (for siblings)
# but not CLL. That's because CLL will make it look like the warehouse node has downstream
# column lineage, but it's really just empty.
for column in node.columns or []:
field_name = column.name
if convert_column_urns_to_lowercase:
field_name = field_name.lower()

cll.append(
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[
mce_builder.make_schema_field_urn(upstream_urn, field_name)
],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[
mce_builder.make_schema_field_urn(downstream_urn, field_name)
],
)
)
)

return UpstreamLineageClass(
upstreams=[
Expand Down Expand Up @@ -1477,6 +1484,7 @@ def create_target_platform_mces(
downstream_urn=node_datahub_urn,
node=node,
convert_column_urns_to_lowercase=self.config.convert_column_urns_to_lowercase,
skip_sources_in_lineage=self.config.skip_sources_in_lineage,
)
if self.config.incremental_lineage:
# We only generate incremental lineage for non-dbt nodes.
Expand Down Expand Up @@ -1822,6 +1830,7 @@ def _create_lineage_aspect_for_dbt_node(
downstream_urn=node_urn,
node=node,
convert_column_urns_to_lowercase=self.config.convert_column_urns_to_lowercase,
skip_sources_in_lineage=self.config.skip_sources_in_lineage,
)
else:
upstream_urns = get_upstreams(
Expand Down
Loading

0 comments on commit bb8cf97

Please sign in to comment.