Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): support view lineage for all sqlalchemy sources #9039

2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/configuration/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class VersionedConfig(ConfigModel):

class LineageConfig(ConfigModel):
incremental_lineage: bool = Field(
default=True,
default=False,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incremental lineage requires presence of DataHubGraph, which is available by default only when using DataHub rest sink. We plan to keep this default enabled in managed ingestion.

description="When enabled, emits lineage as incremental to existing lineage already in DataHub. When disabled, re-states lineage on each run.",
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def process_sql_parsing_result(
user: Optional[UserUrn] = None,
custom_operation_type: Optional[str] = None,
include_urns: Optional[Set[DatasetUrn]] = None,
include_column_lineage: bool = True,
) -> Iterable[MetadataWorkUnit]:
"""Process a single query and yield any generated workunits.

Expand All @@ -130,7 +131,9 @@ def process_sql_parsing_result(
_merge_lineage_data(
downstream_urn=downstream_urn,
upstream_urns=result.in_tables,
column_lineage=result.column_lineage,
column_lineage=result.column_lineage
if include_column_lineage
else None,
upstream_edges=self._lineage_map[downstream_urn],
query_timestamp=query_timestamp,
is_view_ddl=is_view_ddl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,13 @@ def auto_incremental_lineage(
if len(wu.metadata.proposedSnapshot.aspects) > 0:
yield wu

yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
) if lineage_aspect.fineGrainedLineages else _convert_upstream_lineage_to_patch(
urn, lineage_aspect, wu.metadata.systemMetadata
)
if lineage_aspect.fineGrainedLineages:
yield _lineage_wu_via_read_modify_write(
graph, urn, lineage_aspect, wu.metadata.systemMetadata
)
elif lineage_aspect.upstreams:
yield _convert_upstream_lineage_to_patch(
Copy link
Collaborator Author

@mayurinehate mayurinehate Oct 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is a table level upstream only aspect with empty upstreams, we ignore it, as part of incremental lineage.

urn, lineage_aspect, wu.metadata.systemMetadata
)
else:
yield wu
24 changes: 24 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
Expand Down Expand Up @@ -215,12 +216,35 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
)
):
auto_lowercase_dataset_urns = auto_lowercase_urns

incremental_lineage_processor: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.source
and self.ctx.pipeline_config.source.config
):
incremental_lineage = (
hasattr(
self.ctx.pipeline_config.source.config,
"incremental_lineage",
)
and self.ctx.pipeline_config.source.config.incremental_lineage
) or (
hasattr(self.ctx.pipeline_config.source.config, "get")
and self.ctx.pipeline_config.source.config.get("incremental_lineage")
)
incremental_lineage_processor = partial(
auto_incremental_lineage,
self.ctx.graph,
incremental_lineage,
)
return [
auto_lowercase_dataset_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
partial(auto_workunit_reporter, self.get_report()),
incremental_lineage_processor,
]

@staticmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,14 +200,15 @@ def _gen_workunit_from_sql_parsing_result(
self,
dataset_identifier: str,
result: SqlParsingResult,
) -> MetadataWorkUnit:
) -> Iterable[MetadataWorkUnit]:
upstreams, fine_upstreams = self.get_upstreams_from_sql_parsing_result(
self.dataset_urn_builder(dataset_identifier), result
)
self.report.num_views_with_upstreams += 1
return self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)
if upstreams:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not emit upstream lineage if no upstreams are found.

self.report.num_views_with_upstreams += 1
yield self._create_upstream_lineage_workunit(
dataset_identifier, upstreams, fine_upstreams
)

def _gen_workunits_from_query_result(
self,
Expand Down Expand Up @@ -251,7 +252,7 @@ def get_view_upstream_workunits(
)
if result:
views_processed.add(view_identifier)
yield self._gen_workunit_from_sql_parsing_result(
yield from self._gen_workunit_from_sql_parsing_result(
view_identifier, result
)
self.report.view_lineage_parse_secs = timer.elapsed_seconds()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import os.path
import platform
from dataclasses import dataclass
from functools import partial
from typing import Callable, Dict, Iterable, List, Optional, Union

import pandas as pd
Expand All @@ -27,7 +26,6 @@
platform_name,
support_status,
)
from datahub.ingestion.api.incremental_lineage_helper import auto_incremental_lineage
from datahub.ingestion.api.source import (
CapabilityReport,
MetadataWorkUnitProcessor,
Expand Down Expand Up @@ -513,11 +511,6 @@ def _init_schema_resolver(self) -> SchemaResolver:
def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
partial(
auto_incremental_lineage,
self.ctx.graph,
self.config.incremental_lineage,
),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
Expand Down
86 changes: 75 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/source/sql/hive.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import json
import logging
import re
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Iterable, List, Optional, Union

from pydantic.class_validators import validator
from pydantic.fields import Field

# This import verifies that the dependencies are available.
from pyhive import hive # noqa: F401
from pyhive.sqlalchemy_hive import HiveDate, HiveDecimal, HiveTimestamp
from sqlalchemy.engine.reflection import Inspector

from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.decorators import (
SourceCapability,
SupportStatus,
Expand All @@ -18,8 +21,10 @@
platform_name,
support_status,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.extractor import schema_util
from datahub.ingestion.source.sql.sql_common import register_custom_type
from datahub.ingestion.source.sql.sql_common import SqlWorkUnit, register_custom_type
from datahub.ingestion.source.sql.sql_config import SQLCommonConfig
from datahub.ingestion.source.sql.two_tier_sql_source import (
TwoTierSQLAlchemyConfig,
TwoTierSQLAlchemySource,
Expand All @@ -31,6 +36,7 @@
SchemaField,
TimeTypeClass,
)
from datahub.metadata.schema_classes import ViewPropertiesClass
from datahub.utilities import config_clean
from datahub.utilities.hive_schema_to_avro import get_avro_schema_for_hive_column

Expand Down Expand Up @@ -90,19 +96,39 @@ def dbapi_get_columns_patched(self, connection, table_name, schema=None, **kw):
logger.warning(f"Failed to patch method due to {e}")


try:
from pyhive.sqlalchemy_hive import HiveDialect
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we can also move this code to acryl pyhive fork - https://github.com/acryldata/PyHive

This seemed simpler and easier to test this end to end. Open to suggestions here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine for now, and we can fix up when we refactor sql common next week


@reflection.cache # type: ignore
def get_view_names_patched(self, connection, schema=None, **kw):
query = "SHOW VIEWS"
if schema:
query += " IN " + self.identifier_preparer.quote_identifier(schema)
return [row[0] for row in connection.execute(query)]

@reflection.cache # type: ignore
def get_view_definition_patched(self, connection, view_name, schema=None, **kw):
full_table = self.identifier_preparer.quote_identifier(view_name)
if schema:
full_table = "{}.{}".format(
self.identifier_preparer.quote_identifier(schema),
self.identifier_preparer.quote_identifier(view_name),
)
row = connection.execute("SHOW CREATE TABLE {}".format(full_table)).fetchone()
return row[0]

HiveDialect.get_view_names = get_view_names_patched
HiveDialect.get_view_definition = get_view_definition_patched
except ModuleNotFoundError:
pass
except Exception as e:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failure to patch should cause the source to fail to load right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Let me remove this exception handling.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

logger.warning(f"Failed to patch method due to {e}")


class HiveConfig(TwoTierSQLAlchemyConfig):
# defaults
scheme = Field(default="hive", hidden_from_docs=True)

# Hive SQLAlchemy connector returns views as tables.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# Disabling views helps us prevent this duplication.
include_views = Field(
default=False,
hidden_from_docs=True,
description="Hive SQLAlchemy connector returns views as tables. See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273. Disabling views helps us prevent this duplication.",
)

@validator("host_port")
def clean_host_port(cls, v):
return config_clean.remove_protocol(v)
Expand Down Expand Up @@ -174,3 +200,41 @@ def get_schema_fields_for_column(
return new_fields

return fields

# Hive SQLAlchemy connector returns views as tables in get_table_names.
# See https://github.com/dropbox/PyHive/blob/b21c507a24ed2f2b0cf15b0b6abb1c43f31d3ee0/pyhive/sqlalchemy_hive.py#L270-L273.
# This override makes sure that we ingest view definitions for views
def _process_view(
self,
dataset_name: str,
inspector: Inspector,
schema: str,
view: str,
sql_config: SQLCommonConfig,
) -> Iterable[Union[SqlWorkUnit, MetadataWorkUnit]]:
dataset_urn = make_dataset_urn_with_platform_instance(
self.platform,
dataset_name,
self.config.platform_instance,
self.config.env,
)

try:
view_definition = inspector.get_view_definition(view, schema)
if view_definition is None:
view_definition = ""
else:
# Some dialects return a TextClause instead of a raw string,
# so we need to convert them to a string.
view_definition = str(view_definition)
except NotImplementedError:
view_definition = ""

if view_definition:
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition
)
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
Loading
Loading