Skip to content

Commit

Permalink
feat(ingest): support view lineage for all sqlalchemy sources (#9039)
Browse files Browse the repository at this point in the history
  • Loading branch information
mayurinehate authored Oct 26, 2023
1 parent 2ebf33e commit f402090
Show file tree
Hide file tree
Showing 28 changed files with 2,193 additions and 477 deletions.
52 changes: 28 additions & 24 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,22 +101,36 @@
"grpcio-tools>=1.44.0,<2",
}

sql_common = {
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
usage_common = {
"sqlparse",
}

sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.5.2.dev45",
}

sql_common = (
{
# Required for all SQL sources.
# This is temporary lower bound that we're open to loosening/tightening as requirements show up
"sqlalchemy>=1.4.39, <2",
# Required for SQL profiling.
"great-expectations>=0.15.12, <=0.15.50",
# scipy version restricted to reduce backtracking, used by great-expectations,
"scipy>=1.7.2",
# GE added handling for higher version of jinja2
# https://github.com/great-expectations/great_expectations/pull/5382/files
# datahub does not depend on traitlets directly but great expectations does.
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
"greenlet",
}
| usage_common
| sqlglot_lib
)

sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
Expand All @@ -125,12 +139,6 @@
"sqlparse==0.4.4",
}

sqlglot_lib = {
# Using an Acryl fork of sqlglot.
# https://github.com/tobymao/sqlglot/compare/main...hsheth2:sqlglot:hsheth?expand=1
"acryl-sqlglot==18.5.2.dev45",
}

aws_common = {
# AWS Python SDK
"boto3",
Expand Down Expand Up @@ -243,10 +251,6 @@

powerbi_report_server = {"requests", "requests_ntlm"}

usage_common = {
"sqlparse",
}

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.9.0",
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class VersionedConfig(ConfigModel):

class LineageConfig(ConfigModel):
incremental_lineage: bool = Field(
default=True,
default=False,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def process_sql_parsing_result(
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
"""Process a single query and yield any generated workunits.
Expand All @@ -130,7 +131,9 @@ def process_sql_parsing_result(
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage,
column_lineage=result.column_lineage
if include_column_lineage
else None,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def auto_incremental_lineage(
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
if lineage_aspect.fineGrainedLineages:
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
1 change: 1 addition & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns

return [
auto_lowercase_dataset_urns,
auto_status_aspect,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,11 @@ class DBTCommonConfig(
default=False,
description="When enabled, dbt test warnings will be treated as failures.",
)
# override fault value to True.
incremental_lineage: bool = Field(
default=True,
description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

@validator("target_platform")
def validate_target_platform_value(cls, target_platform: str) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def get_workunits(
return

self._populate_external_lineage_map(discovered_tables)

if self.config.include_view_lineage:
if len(discovered_views) > 0:
yield from self.get_view_upstream_workunits(
Expand Down Expand Up @@ -200,14 +199,15 @@ def _gen_workunit_from_sql_parsing_result(
self,
dataset_identifier: str,
result: SqlParsingResult,
) -> MetadataWorkUnit:
) -> Iterable[MetadataWorkUnit]:
upstreams, fine_upstreams = self.get_upstreams_from_sql_parsing_result(
self.dataset_urn_builder(dataset_identifier), result
)
self.report.num_views_with_upstreams += 1
return self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)
if upstreams:
self.report.num_views_with_upstreams += 1
yield self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)

def _gen_workunits_from_query_result(
self,
Expand Down Expand Up @@ -251,7 +251,7 @@ def get_view_upstream_workunits(
)
if result:
views_processed.add(view_identifier)
yield self._gen_workunit_from_sql_parsing_result(
yield from self._gen_workunit_from_sql_parsing_result(
view_identifier, result
)
self.report.view_lineage_parse_secs = timer.elapsed_seconds()
Expand Down
83 changes: 71 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
import logging
import re
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Union

from pydantic.class_validators import validator
from pydantic.fields import Field

# This import verifies that the dependencies are available.
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveDialect, HiveTimestamp
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand All @@ -18,8 +21,10 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand All @@ -31,6 +36,7 @@
SchemaField,
TimeTypeClass,
)
from datahub.metadata.schema_classes import ViewPropertiesClass
from datahub.utilities import config_clean
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

Expand Down Expand Up @@ -90,19 +96,34 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
logger.warning(f"Failed to patch method due to {e}")


@reflection.cache # type: ignore
def get_view_names_patched(self, connection, schema=None, **kw):
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
return [row[0] for row in connection.execute(query)]


@reflection.cache # type: ignore
def get_view_definition_patched(self, connection, view_name, schema=None, **kw):
full_table = self.identifier_preparer.quote_identifier(view_name)
if schema:
full_table = "{}.{}".format(
self.identifier_preparer.quote_identifier(schema),
self.identifier_preparer.quote_identifier(view_name),
)
row = connection.execute("SHOW CREATE TABLE {}".format(full_table)).fetchone()
return row[0]


HiveDialect.get_view_names = get_view_names_patched
HiveDialect.get_view_definition = get_view_definition_patched


class HiveConfig(TwoTierSQLAlchemyConfig):
# defaults
scheme = Field(default="hive", hidden_from_docs=True)

# Hive SQLAlchemy connector returns views as tables.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# Disabling views helps us prevent this duplication.
include_views = Field(
default=False,
hidden_from_docs=True,
description="Hive SQLAlchemy connector returns views as tables. See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273. Disabling views helps us prevent this duplication.",
)

@validator("host_port")
def clean_host_port(cls, v):
return config_clean.remove_protocol(v)
Expand Down Expand Up @@ -174,3 +195,41 @@ def get_schema_fields_for_column(
return new_fields

return fields

# Hive SQLAlchemy connector returns views as tables in get_table_names.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# This override makes sure that we ingest view definitions for views
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)

try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

if view_definition:
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
20 changes: 10 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,6 @@ class BasePostgresConfig(BasicSQLAlchemyConfig):


class PostgresConfig(BasePostgresConfig):
include_view_lineage = Field(
default=False, description="Include table lineage for views"
)

database_pattern: AllowDenyPattern = Field(
default=AllowDenyPattern.allow_all(),
description=(
Expand Down Expand Up @@ -183,9 +179,10 @@ def get_inspectors(self) -> Iterable[Inspector]:
def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()

for inspector in self.get_inspectors():
if self.config.include_view_lineage:
yield from self._get_view_lineage_workunits(inspector)
if self.views_failed_parsing:
for inspector in self.get_inspectors():
if self.config.include_view_lineage:
yield from self._get_view_lineage_workunits(inspector)

def _get_view_lineage_elements(
self, inspector: Inspector
Expand Down Expand Up @@ -245,11 +242,14 @@ def _get_view_lineage_workunits(
dependent_view, dependent_schema = key

# Construct a lineage object.
view_identifier = self.get_identifier(
schema=dependent_schema, entity=dependent_view, inspector=inspector
)
if view_identifier not in self.views_failed_parsing:
return
urn = mce_builder.make_dataset_urn_with_platform_instance(
platform=self.platform,
name=self.get_identifier(
schema=dependent_schema, entity=dependent_view, inspector=inspector
),
name=view_identifier,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
Expand Down
Loading

0 comments on commit f402090

Please sign in to comment.