From 4d990b06bd0df4f51443893e2efb39e09d9818b6 Mon Sep 17 00:00:00 2001 From: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com> Date: Tue, 24 Dec 2024 18:14:51 +0530 Subject: [PATCH] fix(ingest/snowflake): always ingest view and external table ddl lineage (#12191) --- docs/how/updating-datahub.md | 2 +- .../source/snowflake/snowflake_config.py | 28 ++----------------- .../source/snowflake/snowflake_lineage_v2.py | 13 ++------- .../source/snowflake/snowflake_query.py | 9 ------ .../source/snowflake/snowflake_schema_gen.py | 6 +--- .../source/snowflake/snowflake_shares.py | 2 +- .../source/snowflake/snowflake_v2.py | 20 +++++++++---- .../source_report/ingestion_stage.py | 1 + .../tests/integration/snowflake/common.py | 2 -- .../integration/snowflake/test_snowflake.py | 2 -- .../test_snowflake_classification.py | 1 - .../snowflake/test_snowflake_failures.py | 2 -- .../snowflake/test_snowflake_tag.py | 2 -- .../performance/snowflake/test_snowflake.py | 1 - .../unit/snowflake/test_snowflake_source.py | 23 +++++++-------- 15 files changed, 36 insertions(+), 78 deletions(-) diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md index 5bc0e66fa2ff1d..a742ebe0cd8968 100644 --- a/docs/how/updating-datahub.md +++ b/docs/how/updating-datahub.md @@ -17,7 +17,7 @@ This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version. ## Next - +- #12191 - Configs `include_view_lineage` and `include_view_column_lineage` are removed from snowflake ingestion source. View and External Table DDL lineage will always be ingested when definitions are available. - #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.
PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.
Entity urn with `include_workspace_name_in_dataset_urn: false` diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py index 1d1cc3c2af4f08..2b2dcf860cdb07 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py @@ -163,26 +163,13 @@ class SnowflakeConfig( default=True, description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.", ) - include_view_lineage: bool = pydantic.Field( - default=True, - description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.", - ) + + _include_view_lineage = pydantic_removed_field("include_view_lineage") + _include_view_column_lineage = pydantic_removed_field("include_view_column_lineage") ignore_start_time_lineage: bool = False upstream_lineage_in_report: bool = False - @pydantic.root_validator(skip_on_failure=True) - def validate_include_view_lineage(cls, values): - if ( - "include_table_lineage" in values - and not values.get("include_table_lineage") - and values.get("include_view_lineage") - ): - raise ValueError( - "include_table_lineage must be True for include_view_lineage to be set." - ) - return values - class SnowflakeV2Config( SnowflakeConfig, @@ -222,11 +209,6 @@ class SnowflakeV2Config( description="Populates table->table and view->table column lineage. Requires appropriate grants given to the role and the Snowflake Enterprise Edition or above.", ) - include_view_column_lineage: bool = Field( - default=True, - description="Populates view->view and table->view column lineage using DataHub's sql parser.", - ) - use_queries_v2: bool = Field( default=False, description="If enabled, uses the new queries extractor to extract queries from snowflake.", @@ -355,10 +337,6 @@ def get_sql_alchemy_url( self, database=database, username=username, password=password, role=role ) - @property - def parse_view_ddl(self) -> bool: - return self.include_view_column_lineage - @validator("shares") def validate_shares( cls, shares: Optional[Dict[str, SnowflakeShareConfig]], values: Dict diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index b815a6584379ac..6b200590d7ab63 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -8,7 +8,6 @@ from datahub.configuration.datetimes import parse_absolute_time from datahub.ingestion.api.closeable import Closeable -from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage from datahub.ingestion.source.snowflake.constants import ( LINEAGE_PERMISSION_ERROR, @@ -163,11 +162,11 @@ def get_time_window(self) -> Tuple[datetime, datetime]: self.config.end_time, ) - def get_workunits( + def add_time_based_lineage_to_aggregator( self, discovered_tables: List[str], discovered_views: List[str], - ) -> Iterable[MetadataWorkUnit]: + ) -> None: if not self._should_ingest_lineage(): return @@ -177,9 +176,7 @@ def get_workunits( # snowflake view/table -> snowflake table self.populate_table_upstreams(discovered_tables) - for mcp in self.sql_aggregator.gen_metadata(): - yield mcp.as_workunit() - + def update_state(self): if self.redundant_run_skip_handler: # Update the checkpoint state for this run. self.redundant_run_skip_handler.update_state( @@ -337,10 +334,6 @@ def _fetch_upstream_lineages_for_tables(self) -> Iterable[UpstreamLineageEdge]: start_time_millis=int(self.start_time.timestamp() * 1000), end_time_millis=int(self.end_time.timestamp() * 1000), upstreams_deny_pattern=self.config.temporary_tables_pattern, - # The self.config.include_view_lineage setting is about fetching upstreams of views. - # We always generate lineage pointing at views from tables, even if self.config.include_view_lineage is False. - # TODO: Remove this `include_view_lineage` flag, since it's effectively dead code. - include_view_lineage=True, include_column_lineage=self.config.include_column_lineage, ) try: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py index 97c398c1962d6b..a94b39476b2c22 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py @@ -376,7 +376,6 @@ def view_dependencies() -> str: def table_to_table_lineage_history_v2( start_time_millis: int, end_time_millis: int, - include_view_lineage: bool = True, include_column_lineage: bool = True, upstreams_deny_pattern: List[str] = DEFAULT_TEMP_TABLES_PATTERNS, ) -> str: @@ -385,14 +384,12 @@ def table_to_table_lineage_history_v2( start_time_millis, end_time_millis, upstreams_deny_pattern, - include_view_lineage, ) else: return SnowflakeQuery.table_upstreams_only( start_time_millis, end_time_millis, upstreams_deny_pattern, - include_view_lineage, ) @staticmethod @@ -677,12 +674,9 @@ def table_upstreams_with_column_lineage( start_time_millis: int, end_time_millis: int, upstreams_deny_pattern: List[str], - include_view_lineage: bool = True, ) -> str: allowed_upstream_table_domains = ( SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER - if include_view_lineage - else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER ) upstream_sql_filter = create_deny_regex_sql_filter( @@ -847,12 +841,9 @@ def table_upstreams_only( start_time_millis: int, end_time_millis: int, upstreams_deny_pattern: List[str], - include_view_lineage: bool = True, ) -> str: allowed_upstream_table_domains = ( SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER - if include_view_lineage - else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER ) upstream_sql_filter = create_deny_regex_sql_filter( diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 4b72b09fafe2dd..8a1bf15b7a7bc4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -435,11 +435,7 @@ def _process_schema( ) if self.config.include_views: - if ( - self.aggregator - and self.config.include_view_lineage - and self.config.parse_view_ddl - ): + if self.aggregator: for view in views: view_identifier = self.identifiers.get_dataset_identifier( view.name, schema_name, db_name diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py index 794a6f4a59f46f..606acd53dc3324 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py @@ -72,7 +72,7 @@ def get_shares_workunits( assert len(sibling_dbs) == 1 # SnowflakeLineageExtractor is unaware of database->schema->table hierarchy # hence this lineage code is not written in SnowflakeLineageExtractor - # also this is not governed by configs include_table_lineage and include_view_lineage + # also this is not governed by configs include_table_lineage yield self.get_upstream_lineage_with_primary_sibling( db.name, schema.name, table_name, sibling_dbs[0] ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 884e6c49f5b62a..954e8a29c1a1bd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -82,6 +82,7 @@ LINEAGE_EXTRACTION, METADATA_EXTRACTION, QUERIES_EXTRACTION, + VIEW_PARSING, ) from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator from datahub.utilities.registries.domain_registry import DomainRegistry @@ -103,7 +104,7 @@ @capability(SourceCapability.DESCRIPTIONS, "Enabled by default") @capability( SourceCapability.LINEAGE_COARSE, - "Enabled by default, can be disabled via configuration `include_table_lineage` and `include_view_lineage`", + "Enabled by default, can be disabled via configuration `include_table_lineage`", ) @capability( SourceCapability.LINEAGE_FINE, @@ -512,15 +513,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: discovered_datasets = discovered_tables + discovered_views if self.config.use_queries_v2: - self.report.set_ingestion_stage("*", "View Parsing") - assert self.aggregator is not None + self.report.set_ingestion_stage("*", VIEW_PARSING) yield from auto_workunit(self.aggregator.gen_metadata()) self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) schema_resolver = self.aggregator._schema_resolver - queries_extractor: SnowflakeQueriesExtractor = SnowflakeQueriesExtractor( + queries_extractor = SnowflakeQueriesExtractor( connection=self.connection, config=SnowflakeQueriesExtractorConfig( window=self.config, @@ -546,13 +546,21 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: queries_extractor.close() else: - if self.config.include_table_lineage and self.lineage_extractor: + if self.lineage_extractor: self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION) - yield from self.lineage_extractor.get_workunits( + self.lineage_extractor.add_time_based_lineage_to_aggregator( discovered_tables=discovered_tables, discovered_views=discovered_views, ) + # This would emit view and external table ddl lineage + # as well as query lineage via lineage_extractor + for mcp in self.aggregator.gen_metadata(): + yield mcp.as_workunit() + + if self.lineage_extractor: + self.lineage_extractor.update_state() + if ( self.config.include_usage_stats or self.config.include_operational_stats ) and self.usage_extractor: diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 92407eaae6e901..42b3b648bd298d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -15,6 +15,7 @@ USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats" USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation" EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage" +VIEW_PARSING = "View Parsing" QUERIES_EXTRACTION = "Queries Extraction" PROFILING = "Profiling" diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py index 862d27186703a8..7b4f5abe1cd462 100644 --- a/metadata-ingestion/tests/integration/snowflake/common.py +++ b/metadata-ingestion/tests/integration/snowflake/common.py @@ -458,7 +458,6 @@ def default_query_results( # noqa: C901 snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( start_time_millis=1654473600000, end_time_millis=1654586220000, - include_view_lineage=True, include_column_lineage=True, ), ): @@ -548,7 +547,6 @@ def default_query_results( # noqa: C901 snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( start_time_millis=1654473600000, end_time_millis=1654586220000, - include_view_lineage=True, include_column_lineage=False, ), ): diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py index 1d7470d24f7689..ef4918a20e640c 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py @@ -117,7 +117,6 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph): schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), include_technical_schema=True, include_table_lineage=True, - include_view_lineage=True, include_usage_stats=True, format_sql_queries=True, validate_upstreams_against_patterns=False, @@ -216,7 +215,6 @@ def test_snowflake_private_link_and_incremental_mcps( include_table_lineage=True, include_column_lineage=False, include_views=True, - include_view_lineage=True, include_usage_stats=False, format_sql_queries=True, incremental_lineage=False, diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py index 75a9df4f280512..52453b30f740ab 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py @@ -66,7 +66,6 @@ def test_snowflake_classification_perf(num_workers, num_cols_per_table, num_tabl schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), include_technical_schema=True, include_table_lineage=False, - include_view_lineage=False, include_column_lineage=False, include_usage_stats=False, include_operational_stats=False, diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py index 0b838b0bb59c3a..de6e996a52642b 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py @@ -49,7 +49,6 @@ def snowflake_pipeline_config(tmp_path): include_technical_schema=True, match_fully_qualified_names=True, schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]), - include_view_lineage=False, include_usage_stats=False, start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace( tzinfo=timezone.utc @@ -227,7 +226,6 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure( snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2( start_time_millis=1654473600000, end_time_millis=1654586220000, - include_view_lineage=True, include_column_lineage=True, ) ], diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py index d5e265e7838825..9bb598cb0c1c7f 100644 --- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py +++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py @@ -30,7 +30,6 @@ def test_snowflake_tag_pattern(): ), include_technical_schema=True, include_table_lineage=False, - include_view_lineage=False, include_column_lineage=False, include_usage_stats=False, include_operational_stats=False, @@ -74,7 +73,6 @@ def test_snowflake_tag_pattern_deny(): ), include_technical_schema=True, include_table_lineage=False, - include_view_lineage=False, include_column_lineage=False, include_usage_stats=False, include_operational_stats=False, diff --git a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py index 5042c78c2e7b91..984d9e42957452 100644 --- a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py @@ -37,7 +37,6 @@ def run_test(): password="TST_PWD", include_technical_schema=False, include_table_lineage=True, - include_view_lineage=True, include_usage_stats=True, include_operational_stats=True, start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(tzinfo=timezone.utc), diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py index 2ff85a08f052f9..75f32b535eb2e8 100644 --- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py +++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py @@ -257,17 +257,6 @@ def test_options_contain_connect_args(): assert connect_args is not None -def test_snowflake_config_with_view_lineage_no_table_lineage_throws_error(): - config_dict = default_config_dict.copy() - config_dict["include_view_lineage"] = True - config_dict["include_table_lineage"] = False - with pytest.raises( - ValidationError, - match="include_table_lineage must be True for include_view_lineage to be set", - ): - SnowflakeV2Config.parse_obj(config_dict) - - def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error(): config_dict = default_config_dict.copy() config_dict["include_column_lineage"] = True @@ -667,6 +656,18 @@ def test_snowflake_utils() -> None: assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils) +def test_using_removed_fields_causes_no_error() -> None: + assert SnowflakeV2Config.parse_obj( + { + "account_id": "test", + "username": "snowflake", + "password": "snowflake", + "include_view_lineage": "true", + "include_view_column_lineage": "true", + } + ) + + def test_snowflake_query_result_parsing(): db_row = { "DOWNSTREAM_TABLE_NAME": "db.schema.downstream_table",