diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md
index 5bc0e66fa2ff1..a742ebe0cd896 100644
--- a/docs/how/updating-datahub.md
+++ b/docs/how/updating-datahub.md
@@ -17,7 +17,7 @@
This file documents any backwards-incompatible changes in DataHub and assists people when migrating to a new version.
## Next
-
+- #12191 - Configs `include_view_lineage` and `include_view_column_lineage` are removed from snowflake ingestion source. View and External Table DDL lineage will always be ingested when definitions are available.
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.
PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.
Entity urn with `include_workspace_name_in_dataset_urn: false`
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py
index 1d1cc3c2af4f0..2b2dcf860cdb0 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_config.py
@@ -163,26 +163,13 @@ class SnowflakeConfig(
default=True,
description="If enabled, populates the snowflake table-to-table and s3-to-snowflake table lineage. Requires appropriate grants given to the role and Snowflake Enterprise Edition or above.",
)
- include_view_lineage: bool = pydantic.Field(
- default=True,
- description="If enabled, populates the snowflake view->table and table->view lineages. Requires appropriate grants given to the role, and include_table_lineage to be True. view->table lineage requires Snowflake Enterprise Edition or above.",
- )
+
+ _include_view_lineage = pydantic_removed_field("include_view_lineage")
+ _include_view_column_lineage = pydantic_removed_field("include_view_column_lineage")
ignore_start_time_lineage: bool = False
upstream_lineage_in_report: bool = False
- @pydantic.root_validator(skip_on_failure=True)
- def validate_include_view_lineage(cls, values):
- if (
- "include_table_lineage" in values
- and not values.get("include_table_lineage")
- and values.get("include_view_lineage")
- ):
- raise ValueError(
- "include_table_lineage must be True for include_view_lineage to be set."
- )
- return values
-
class SnowflakeV2Config(
SnowflakeConfig,
@@ -222,11 +209,6 @@ class SnowflakeV2Config(
description="Populates table->table and view->table column lineage. Requires appropriate grants given to the role and the Snowflake Enterprise Edition or above.",
)
- include_view_column_lineage: bool = Field(
- default=True,
- description="Populates view->view and table->view column lineage using DataHub's sql parser.",
- )
-
use_queries_v2: bool = Field(
default=False,
description="If enabled, uses the new queries extractor to extract queries from snowflake.",
@@ -355,10 +337,6 @@ def get_sql_alchemy_url(
self, database=database, username=username, password=password, role=role
)
- @property
- def parse_view_ddl(self) -> bool:
- return self.include_view_column_lineage
-
@validator("shares")
def validate_shares(
cls, shares: Optional[Dict[str, SnowflakeShareConfig]], values: Dict
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py
index b815a6584379a..6b200590d7ab6 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py
@@ -8,7 +8,6 @@
from datahub.configuration.datetimes import parse_absolute_time
from datahub.ingestion.api.closeable import Closeable
-from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.snowflake.constants import (
LINEAGE_PERMISSION_ERROR,
@@ -163,11 +162,11 @@ def get_time_window(self) -> Tuple[datetime, datetime]:
self.config.end_time,
)
- def get_workunits(
+ def add_time_based_lineage_to_aggregator(
self,
discovered_tables: List[str],
discovered_views: List[str],
- ) -> Iterable[MetadataWorkUnit]:
+ ) -> None:
if not self._should_ingest_lineage():
return
@@ -177,9 +176,7 @@ def get_workunits(
# snowflake view/table -> snowflake table
self.populate_table_upstreams(discovered_tables)
- for mcp in self.sql_aggregator.gen_metadata():
- yield mcp.as_workunit()
-
+ def update_state(self):
if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
self.redundant_run_skip_handler.update_state(
@@ -337,10 +334,6 @@ def _fetch_upstream_lineages_for_tables(self) -> Iterable[UpstreamLineageEdge]:
start_time_millis=int(self.start_time.timestamp() * 1000),
end_time_millis=int(self.end_time.timestamp() * 1000),
upstreams_deny_pattern=self.config.temporary_tables_pattern,
- # The self.config.include_view_lineage setting is about fetching upstreams of views.
- # We always generate lineage pointing at views from tables, even if self.config.include_view_lineage is False.
- # TODO: Remove this `include_view_lineage` flag, since it's effectively dead code.
- include_view_lineage=True,
include_column_lineage=self.config.include_column_lineage,
)
try:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
index 97c398c1962d6..a94b39476b2c2 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_query.py
@@ -376,7 +376,6 @@ def view_dependencies() -> str:
def table_to_table_lineage_history_v2(
start_time_millis: int,
end_time_millis: int,
- include_view_lineage: bool = True,
include_column_lineage: bool = True,
upstreams_deny_pattern: List[str] = DEFAULT_TEMP_TABLES_PATTERNS,
) -> str:
@@ -385,14 +384,12 @@ def table_to_table_lineage_history_v2(
start_time_millis,
end_time_millis,
upstreams_deny_pattern,
- include_view_lineage,
)
else:
return SnowflakeQuery.table_upstreams_only(
start_time_millis,
end_time_millis,
upstreams_deny_pattern,
- include_view_lineage,
)
@staticmethod
@@ -677,12 +674,9 @@ def table_upstreams_with_column_lineage(
start_time_millis: int,
end_time_millis: int,
upstreams_deny_pattern: List[str],
- include_view_lineage: bool = True,
) -> str:
allowed_upstream_table_domains = (
SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER
- if include_view_lineage
- else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
)
upstream_sql_filter = create_deny_regex_sql_filter(
@@ -847,12 +841,9 @@ def table_upstreams_only(
start_time_millis: int,
end_time_millis: int,
upstreams_deny_pattern: List[str],
- include_view_lineage: bool = True,
) -> str:
allowed_upstream_table_domains = (
SnowflakeQuery.ACCESS_HISTORY_TABLE_VIEW_DOMAINS_FILTER
- if include_view_lineage
- else SnowflakeQuery.ACCESS_HISTORY_TABLE_DOMAINS_FILTER
)
upstream_sql_filter = create_deny_regex_sql_filter(
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py
index 4b72b09fafe2d..8a1bf15b7a7bc 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py
@@ -435,11 +435,7 @@ def _process_schema(
)
if self.config.include_views:
- if (
- self.aggregator
- and self.config.include_view_lineage
- and self.config.parse_view_ddl
- ):
+ if self.aggregator:
for view in views:
view_identifier = self.identifiers.get_dataset_identifier(
view.name, schema_name, db_name
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py
index 794a6f4a59f46..606acd53dc332 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_shares.py
@@ -72,7 +72,7 @@ def get_shares_workunits(
assert len(sibling_dbs) == 1
# SnowflakeLineageExtractor is unaware of database->schema->table hierarchy
# hence this lineage code is not written in SnowflakeLineageExtractor
- # also this is not governed by configs include_table_lineage and include_view_lineage
+ # also this is not governed by configs include_table_lineage
yield self.get_upstream_lineage_with_primary_sibling(
db.name, schema.name, table_name, sibling_dbs[0]
)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
index 884e6c49f5b62..954e8a29c1a1b 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py
@@ -82,6 +82,7 @@
LINEAGE_EXTRACTION,
METADATA_EXTRACTION,
QUERIES_EXTRACTION,
+ VIEW_PARSING,
)
from datahub.sql_parsing.sql_parsing_aggregator import SqlParsingAggregator
from datahub.utilities.registries.domain_registry import DomainRegistry
@@ -103,7 +104,7 @@
@capability(SourceCapability.DESCRIPTIONS, "Enabled by default")
@capability(
SourceCapability.LINEAGE_COARSE,
- "Enabled by default, can be disabled via configuration `include_table_lineage` and `include_view_lineage`",
+ "Enabled by default, can be disabled via configuration `include_table_lineage`",
)
@capability(
SourceCapability.LINEAGE_FINE,
@@ -512,15 +513,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
discovered_datasets = discovered_tables + discovered_views
if self.config.use_queries_v2:
- self.report.set_ingestion_stage("*", "View Parsing")
- assert self.aggregator is not None
+ self.report.set_ingestion_stage("*", VIEW_PARSING)
yield from auto_workunit(self.aggregator.gen_metadata())
self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)
schema_resolver = self.aggregator._schema_resolver
- queries_extractor: SnowflakeQueriesExtractor = SnowflakeQueriesExtractor(
+ queries_extractor = SnowflakeQueriesExtractor(
connection=self.connection,
config=SnowflakeQueriesExtractorConfig(
window=self.config,
@@ -546,13 +546,21 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
queries_extractor.close()
else:
- if self.config.include_table_lineage and self.lineage_extractor:
+ if self.lineage_extractor:
self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION)
- yield from self.lineage_extractor.get_workunits(
+ self.lineage_extractor.add_time_based_lineage_to_aggregator(
discovered_tables=discovered_tables,
discovered_views=discovered_views,
)
+ # This would emit view and external table ddl lineage
+ # as well as query lineage via lineage_extractor
+ for mcp in self.aggregator.gen_metadata():
+ yield mcp.as_workunit()
+
+ if self.lineage_extractor:
+ self.lineage_extractor.update_state()
+
if (
self.config.include_usage_stats or self.config.include_operational_stats
) and self.usage_extractor:
diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py
index 92407eaae6e90..42b3b648bd298 100644
--- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py
@@ -15,6 +15,7 @@
USAGE_EXTRACTION_OPERATIONAL_STATS = "Usage Extraction Operational Stats"
USAGE_EXTRACTION_USAGE_AGGREGATION = "Usage Extraction Usage Aggregation"
EXTERNAL_TABLE_DDL_LINEAGE = "External table DDL Lineage"
+VIEW_PARSING = "View Parsing"
QUERIES_EXTRACTION = "Queries Extraction"
PROFILING = "Profiling"
diff --git a/metadata-ingestion/tests/integration/snowflake/common.py b/metadata-ingestion/tests/integration/snowflake/common.py
index 862d27186703a..7b4f5abe1cd46 100644
--- a/metadata-ingestion/tests/integration/snowflake/common.py
+++ b/metadata-ingestion/tests/integration/snowflake/common.py
@@ -458,7 +458,6 @@ def default_query_results( # noqa: C901
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
- include_view_lineage=True,
include_column_lineage=True,
),
):
@@ -548,7 +547,6 @@ def default_query_results( # noqa: C901
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
- include_view_lineage=True,
include_column_lineage=False,
),
):
diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py
index 1d7470d24f768..ef4918a20e640 100644
--- a/metadata-ingestion/tests/integration/snowflake/test_snowflake.py
+++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake.py
@@ -117,7 +117,6 @@ def test_snowflake_basic(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=True,
- include_view_lineage=True,
include_usage_stats=True,
format_sql_queries=True,
validate_upstreams_against_patterns=False,
@@ -216,7 +215,6 @@ def test_snowflake_private_link_and_incremental_mcps(
include_table_lineage=True,
include_column_lineage=False,
include_views=True,
- include_view_lineage=True,
include_usage_stats=False,
format_sql_queries=True,
incremental_lineage=False,
diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py
index 75a9df4f28051..52453b30f740a 100644
--- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py
+++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_classification.py
@@ -66,7 +66,6 @@ def test_snowflake_classification_perf(num_workers, num_cols_per_table, num_tabl
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
include_technical_schema=True,
include_table_lineage=False,
- include_view_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py
index 0b838b0bb59c3..de6e996a52642 100644
--- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py
+++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_failures.py
@@ -49,7 +49,6 @@ def snowflake_pipeline_config(tmp_path):
include_technical_schema=True,
match_fully_qualified_names=True,
schema_pattern=AllowDenyPattern(allow=["test_db.test_schema"]),
- include_view_lineage=False,
include_usage_stats=False,
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(
tzinfo=timezone.utc
@@ -227,7 +226,6 @@ def test_snowflake_missing_snowflake_lineage_permission_causes_pipeline_failure(
snowflake_query.SnowflakeQuery.table_to_table_lineage_history_v2(
start_time_millis=1654473600000,
end_time_millis=1654586220000,
- include_view_lineage=True,
include_column_lineage=True,
)
],
diff --git a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py
index d5e265e783882..9bb598cb0c1c7 100644
--- a/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py
+++ b/metadata-ingestion/tests/integration/snowflake/test_snowflake_tag.py
@@ -30,7 +30,6 @@ def test_snowflake_tag_pattern():
),
include_technical_schema=True,
include_table_lineage=False,
- include_view_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
@@ -74,7 +73,6 @@ def test_snowflake_tag_pattern_deny():
),
include_technical_schema=True,
include_table_lineage=False,
- include_view_lineage=False,
include_column_lineage=False,
include_usage_stats=False,
include_operational_stats=False,
diff --git a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py
index 5042c78c2e7b9..984d9e4295745 100644
--- a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py
+++ b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py
@@ -37,7 +37,6 @@ def run_test():
password="TST_PWD",
include_technical_schema=False,
include_table_lineage=True,
- include_view_lineage=True,
include_usage_stats=True,
include_operational_stats=True,
start_time=datetime(2022, 6, 6, 0, 0, 0, 0).replace(tzinfo=timezone.utc),
diff --git a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py
index 2ff85a08f052f..75f32b535eb2e 100644
--- a/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py
+++ b/metadata-ingestion/tests/unit/snowflake/test_snowflake_source.py
@@ -257,17 +257,6 @@ def test_options_contain_connect_args():
assert connect_args is not None
-def test_snowflake_config_with_view_lineage_no_table_lineage_throws_error():
- config_dict = default_config_dict.copy()
- config_dict["include_view_lineage"] = True
- config_dict["include_table_lineage"] = False
- with pytest.raises(
- ValidationError,
- match="include_table_lineage must be True for include_view_lineage to be set",
- ):
- SnowflakeV2Config.parse_obj(config_dict)
-
-
def test_snowflake_config_with_column_lineage_no_table_lineage_throws_error():
config_dict = default_config_dict.copy()
config_dict["include_column_lineage"] = True
@@ -667,6 +656,18 @@ def test_snowflake_utils() -> None:
assert_doctest(datahub.ingestion.source.snowflake.snowflake_utils)
+def test_using_removed_fields_causes_no_error() -> None:
+ assert SnowflakeV2Config.parse_obj(
+ {
+ "account_id": "test",
+ "username": "snowflake",
+ "password": "snowflake",
+ "include_view_lineage": "true",
+ "include_view_column_lineage": "true",
+ }
+ )
+
+
def test_snowflake_query_result_parsing():
db_row = {
"DOWNSTREAM_TABLE_NAME": "db.schema.downstream_table",