Skip to content

Commit

Permalink
Merge remote-tracking branch 'datahub/master' into feature/athena-imp…
Browse files Browse the repository at this point in the history
…rovements
  • Loading branch information
bossenti committed Oct 15, 2023
2 parents 92c8000 + 78b342f commit f68c9e0
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 59 deletions.
20 changes: 20 additions & 0 deletions datahub-graphql-core/src/main/resources/search.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,26 @@ enum FilterOperator {
Represents the relation: The field exists. If the field is an array, the field is either not present or empty.
"""
EXISTS

"""
Represent the relation greater than, e.g. ownerCount > 5
"""
GREATER_THAN

"""
Represent the relation greater than or equal to, e.g. ownerCount >= 5
"""
GREATER_THAN_OR_EQUAL_TO

"""
Represent the relation less than, e.g. ownerCount < 3
"""
LESS_THAN

"""
Represent the relation less than or equal to, e.g. ownerCount <= 3
"""
LESS_THAN_OR_EQUAL_TO
}

"""
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/teradata/teradata_pre.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

If you want to run profiling, you need to grant select permission on all the tables you want to profile.

3. If linege or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
3. If lineage or usage extraction is enabled, please, check if query logging is enabled and it is set to size which
will fit for your queries (the default query text size Teradata captures is max 200 chars)
An example how you can set it for all users:
```sql
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/docs/sources/teradata/teradata_recipe.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ source:
type: teradata
config:
host_port: "myteradatainstance.teradata.com:1025"
#platform_instance: "myteradatainstance"
username: myuser
password: mypassword
#database_pattern:
# allow:
# - "demo_user"
# - "my_database"
# ignoreCase: true
include_table_lineage: true
include_usage_statistics: true
Expand Down
9 changes: 5 additions & 4 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,10 @@
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.17.0"} | sqllineage_lib | sqlglot_lib,
"teradata": sql_common | {"teradatasqlalchemy>=17.20.0.0"},
"teradata": sql_common
| usage_common
| sqlglot_lib
| {"teradatasqlalchemy>=17.20.0.0"},
"trino": sql_common | trino,
"starburst-trino-usage": sql_common | usage_common | trino,
"nifi": {"requests", "packaging", "requests-gssapi"},
Expand Down Expand Up @@ -433,9 +436,7 @@
deepdiff_dep = "deepdiff"
test_api_requirements = {pytest_dep, deepdiff_dep, "PyYAML"}

debug_requirements = {
"memray"
}
debug_requirements = {"memray"}

base_dev_requirements = {
*base_requirements,
Expand Down
156 changes: 104 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
from dataclasses import dataclass
from datetime import datetime
from typing import Iterable, Optional, Set, Union

# This import verifies that the dependencies are available.
Expand All @@ -11,6 +12,7 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BaseTimeWindowConfig
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand All @@ -32,11 +34,18 @@
from datahub.ingestion.source.usage.usage_common import BaseUsageConfig
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.ingestion.source_report.time_window import BaseTimeWindowReport
from datahub.metadata._schema_classes import (
MetadataChangeEventClass,
SchemaMetadataClass,
ViewPropertiesClass,
)
from datahub.metadata.com.linkedin.pegasus2avro.schema import (
BytesTypeClass,
TimeTypeClass,
)
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.sqlglot_lineage import SchemaResolver, sqlglot_lineage
from datahub.utilities.urns.dataset_urn import DatasetUrn

logger: logging.Logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -64,6 +73,7 @@
@dataclass
class TeradataReport(ProfilingSqlReport, IngestionStageReport, BaseTimeWindowReport):
num_queries_parsed: int = 0
num_view_ddl_parsed: int = 0
num_table_parse_failures: int = 0


Expand All @@ -82,17 +92,16 @@ class TeradataConfig(BaseTeradataConfig, BaseTimeWindowConfig):
"This requires to have the table lineage feature enabled.",
)

include_view_lineage = Field(
default=True,
description="Whether to include view lineage in the ingestion. "
"This requires to have the view lineage feature enabled.",
)
usage: BaseUsageConfig = Field(
description="The usage config to use when generating usage statistics",
default=BaseUsageConfig(),
)

use_schema_resolver: bool = Field(
default=True,
description="Read SchemaMetadata aspects from DataHub to aid in SQL parsing. Turn off only for testing.",
hidden_from_docs=True,
)

default_db: Optional[str] = Field(
default=None,
description="The default database to use for unqualified table names",
Expand Down Expand Up @@ -141,46 +150,47 @@ def __init__(self, config: TeradataConfig, ctx: PipelineContext):
self.report: TeradataReport = TeradataReport()
self.graph: Optional[DataHubGraph] = ctx.graph

if self.graph:
if self.config.use_schema_resolver:
self.schema_resolver = (
self.graph.initialize_schema_resolver_from_datahub(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
)
self.urns = self.schema_resolver.get_urns()
else:
self.schema_resolver = self.graph._make_schema_resolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
self.urns = None
else:
self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)
self.urns = None

self.builder: SqlParsingBuilder = SqlParsingBuilder(
usage_config=self.config.usage
if self.config.include_usage_statistics
else None,
generate_lineage=self.config.include_table_lineage,
generate_lineage=True,
generate_usage_statistics=self.config.include_usage_statistics,
generate_operations=self.config.usage.include_operational_stats,
)

self.schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
graph=None,
env=self.config.env,
)

self._view_definition_cache: FileBackedDict[str] = FileBackedDict()

@classmethod
def create(cls, config_dict, ctx):
config = TeradataConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
for key in self._view_definition_cache.keys():
view_definition = self._view_definition_cache[key]
dataset_urn = DatasetUrn.create_from_string(key)

db_name: Optional[str] = None
# We need to get the default db from the dataset urn otherwise the builder generates the wrong urns
if "." in dataset_urn.get_dataset_name():
db_name = dataset_urn.get_dataset_name().split(".", 1)[0]

self.report.num_view_ddl_parsed += 1
if self.report.num_view_ddl_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} view ddl")

yield from self.gen_lineage_from_query(
query=view_definition, default_database=db_name, is_view_ddl=True
)

def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
engine = self.get_metadata_engine()
for entry in engine.execute(
Expand All @@ -192,27 +202,43 @@ def get_audit_log_mcps(self) -> Iterable[MetadataWorkUnit]:
if self.report.num_queries_parsed % 1000 == 0:
logger.info(f"Parsed {self.report.num_queries_parsed} queries")

result = sqlglot_lineage(
sql=entry.query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=entry.default_database
if entry.default_database
else self.config.default_db,
yield from self.gen_lineage_from_query(
query=entry.query,
default_database=entry.default_database,
timestamp=entry.timestamp,
user=entry.user,
is_view_ddl=False,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
continue

def gen_lineage_from_query(
self,
query: str,
default_database: Optional[str] = None,
timestamp: Optional[datetime] = None,
user: Optional[str] = None,
is_view_ddl: bool = False,
) -> Iterable[MetadataWorkUnit]:
result = sqlglot_lineage(
sql=query,
schema_resolver=self.schema_resolver,
default_db=None,
default_schema=default_database
if default_database
else self.config.default_db,
)
if result.debug_info.table_error:
logger.debug(
f"Error parsing table lineage, {result.debug_info.table_error}"
)
self.report.num_table_parse_failures += 1
else:
yield from self.builder.process_sql_parsing_result(
result,
query=entry.query,
query_timestamp=entry.timestamp,
user=f"urn:li:corpuser:{entry.user}",
include_urns=self.urns,
query=query,
is_view_ddl=is_view_ddl,
query_timestamp=timestamp,
user=f"urn:li:corpuser:{user}",
include_urns=self.schema_resolver.get_urns(),
)

def get_metadata_engine(self) -> Engine:
Expand All @@ -221,8 +247,34 @@ def get_metadata_engine(self) -> Engine:
return create_engine(url, **self.config.options)

def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit]]:
yield from super().get_workunits_internal()
# Add all schemas to the schema resolver
for wu in super().get_workunits_internal():
if isinstance(wu.metadata, MetadataChangeEventClass):
if wu.metadata.proposedSnapshot:
for aspect in wu.metadata.proposedSnapshot.aspects:
if isinstance(aspect, SchemaMetadataClass):
self.schema_resolver.add_schema_metadata(
wu.metadata.proposedSnapshot.urn,
aspect,
)
break
if isinstance(wu.metadata, MetadataChangeProposalWrapper):
if (
wu.metadata.entityUrn
and isinstance(wu.metadata.aspect, ViewPropertiesClass)
and wu.metadata.aspect.viewLogic
):
self._view_definition_cache[
wu.metadata.entityUrn
] = wu.metadata.aspect.viewLogic
yield wu

if self.config.include_view_lineage:
self.report.report_ingestion_stage_start("view lineage extraction")
yield from self.get_view_lineage()

if self.config.include_table_lineage or self.config.include_usage_statistics:
self.report.report_ingestion_stage_start("audit log extraction")
yield from self.get_audit_log_mcps()
yield from self.builder.gen_workunits()

yield from self.builder.gen_workunits()

0 comments on commit f68c9e0

Please sign in to comment.