Skip to content

Commit

Permalink
fix(ingest): replace sqllineage/sqlparse with our SQL parser (#12020)
Browse files Browse the repository at this point in the history
  • Loading branch information
sagar-salvi-apptware authored Dec 10, 2024
1 parent 61fffb2 commit 57b12bd
Show file tree
Hide file tree
Showing 13 changed files with 130 additions and 435 deletions.
23 changes: 14 additions & 9 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next

- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
Entity urn with `include_workspace_name_in_dataset_urn: false`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
```
Entity urn with `include_workspace_name_in_dataset_urn: false`

Entity urn with `include_workspace_name_in_dataset_urn: true`
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```
```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.]<SemanticModelName>.<TableName>,<ENV>)
```

Entity urn with `include_workspace_name_in_dataset_urn: true`

```
urn:li:dataset:(urn:li:dataPlatform:powerbi,[<PlatformInstance>.].<WorkspaceName>.<SemanticModelName>.<TableName>,<ENV>)
```

The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
`datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.

- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance.<key>.database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
Expand All @@ -48,6 +50,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries _entities_ (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
- #12020 - Removed `sql_parser` configuration from the Redash source, as Redash now exclusively uses the sqlglot-based parser for lineage extraction.
- #12020 - Removed `datahub.utilities.sql_parser`, `datahub.utilities.sql_parser_base` and `datahub.utilities.sql_lineage_parser_impl` module along with `SqlLineageSQLParser` and `DefaultSQLParser`. Use `create_lineage_sql_parsed_result` from `datahub.sql_parsing.sqlglot_lineage` module instead.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
will be removed automatically using logic in the `datahub-gc` ingestion
Expand Down
12 changes: 1 addition & 11 deletions metadata-ingestion-modules/gx-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,6 @@ def get_long_description():

rest_common = {"requests", "requests_file"}

# TODO: Can we move away from sqllineage and use sqlglot ??
sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
"sqlparse==0.4.4",
}

_version: str = package_metadata["__version__"]
_self_pin = (
f"=={_version}"
Expand All @@ -43,8 +34,7 @@ def get_long_description():
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
*rest_common,
*sqllineage_lib,
f"acryl-datahub[datahub-rest]{_self_pin}",
f"acryl-datahub[datahub-rest,sql-parser]{_self_pin}",
}

mypy_stubs = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED
from datahub.utilities.sql_parser import DefaultSQLParser
from datahub.utilities.urns.dataset_urn import DatasetUrn
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import (
Expand Down Expand Up @@ -677,10 +678,23 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
query=query,
customProperties=batchSpecProperties,
)
try:
tables = DefaultSQLParser(query).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {query} with {e}")

data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri))
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=query,
platform=data_platform,
env=self.env,
platform_instance=None,
default_db=None,
)
tables = [
DatasetUrn.from_string(table_urn).name
for table_urn in sql_parser_in_tables.in_tables
]
if sql_parser_in_tables.debug_info.table_error:
logger.warning(
f"Sql parser failed on {query} with {sql_parser_in_tables.debug_info.table_error}"
)
tables = []

if len(set(tables)) != 1:
Expand Down
7 changes: 2 additions & 5 deletions metadata-ingestion/docs/sources/redash/redash.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,2 @@
Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
but can be enabled by setting `parse_table_names_from_sql: true`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
As this package doesn't officially support all the SQL dialects that Redash supports, the result might not be correct. You can, however, implement a
custom parser and take it into use by setting the `sql_parser` configuration value. A custom SQL parser must inherit from `datahub.utilities.sql_parser.SQLParser`
and must be made available to Datahub by ,for example, installing it. The configuration then needs to be set to `module_name.ClassName` of the parser.
Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
but can be enabled by setting `parse_table_names_from_sql: true`. The parser is based on the [`sqlglot`](https://pypi.org/project/sqlglot/) package.
23 changes: 5 additions & 18 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,6 @@
| classification_lib
)

sqllineage_lib = {
"sqllineage==1.3.8",
# We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
# There have previously been issues from not pinning sqlparse, so it's best to pin it.
# Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
"sqlparse==0.4.4",
}

aws_common = {
# AWS Python SDK
"boto3",
Expand Down Expand Up @@ -216,7 +208,6 @@
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector>=2.1.0",
*sqllineage_lib,
*path_spec_common,
}

Expand Down Expand Up @@ -464,9 +455,7 @@
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
"setuptools",
},
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
| sqllineage_lib
| sqlglot_lib,
"mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | mssql_common,
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
Expand All @@ -482,7 +471,7 @@
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
"redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
"redash": {"redash-toolbelt", "sql-metadata"} | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
Expand All @@ -503,9 +492,7 @@
"slack": slack,
"superset": superset_common,
"preset": superset_common,
# FIXME: I don't think tableau uses sqllineage anymore so we should be able
# to remove that dependency.
"tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib,
"tableau": {"tableauserverclient>=0.24.0"} | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
Expand All @@ -527,9 +514,9 @@
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
"unity-catalog": databricks | sql_common | sqllineage_lib,
"unity-catalog": databricks | sql_common,
# databricks is alias for unity-catalog and needs to be kept in sync
"databricks": databricks | sql_common | sqllineage_lib,
"databricks": databricks | sql_common,
"fivetran": snowflake_common | bigquery_common | sqlglot_lib,
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},
Expand Down
23 changes: 0 additions & 23 deletions metadata-ingestion/src/datahub/ingestion/source/mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential

import datahub.emitter.mce_builder as builder
Expand Down Expand Up @@ -820,28 +819,6 @@ def _get_definition(self, definition_name):
)
return None

@lru_cache(maxsize=None)
def _get_source_from_query(self, raw_query: str) -> set:
query = self._replace_definitions(raw_query)
parser = LineageRunner(query)
source_paths = set()
try:
for table in parser.source_tables:
sources = str(table).split(".")
source_schema, source_table = sources[-2], sources[-1]
if source_schema == "<default>":
source_schema = str(self.config.default_schema)

source_paths.add(f"{source_schema}.{source_table}")
except Exception as e:
self.report.report_failure(
title="Failed to Extract Lineage From Query",
message="Unable to retrieve lineage from Mode query.",
context=f"Query: {raw_query}, Error: {str(e)}",
)

return source_paths

def _get_datasource_urn(
self,
platform: str,
Expand Down
76 changes: 13 additions & 63 deletions metadata-ingestion/src/datahub/ingestion/source/redash.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import math
import sys
from dataclasses import dataclass, field
from typing import Dict, Iterable, List, Optional, Set, Type
from typing import Dict, Iterable, List, Optional, Set

import dateutil.parser as dp
from packaging import version
Expand All @@ -22,7 +22,6 @@
platform_name,
support_status,
)
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
Expand All @@ -39,9 +38,9 @@
ChartTypeClass,
DashboardInfoClass,
)
from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.sql_parser_base import SQLParser
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -270,10 +269,6 @@ class RedashConfig(ConfigModel):
parse_table_names_from_sql: bool = Field(
default=False, description="See note below."
)
sql_parser: str = Field(
default="datahub.utilities.sql_parser.DefaultSQLParser",
description="custom SQL parser. See note below for details.",
)

env: str = Field(
default=DEFAULT_ENV,
Expand Down Expand Up @@ -354,7 +349,6 @@ def __init__(self, ctx: PipelineContext, config: RedashConfig):
self.api_page_limit = self.config.api_page_limit or math.inf

self.parse_table_names_from_sql = self.config.parse_table_names_from_sql
self.sql_parser_path = self.config.sql_parser

logger.info(
f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}"
Expand All @@ -380,31 +374,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = RedashConfig.parse_obj(config_dict)
return cls(ctx, config)

@classmethod
def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
assert "." in sql_parser_path, "sql_parser-path must contain a ."
parser_cls = import_path(sql_parser_path)

if not issubclass(parser_cls, SQLParser):
raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}")
return parser_cls

@classmethod
def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
parser_cls = cls._import_sql_parser_cls(sql_parser_path)

try:
sql_table_names: List[str] = parser_cls(sql).get_tables()
except Exception as e:
logger.warning(f"Sql parser failed on {sql} with {e}")
return []

# Remove quotes from table names
sql_table_names = [t.replace('"', "") for t in sql_table_names]
sql_table_names = [t.replace("`", "") for t in sql_table_names]

return sql_table_names

def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict:
url = f"/api/data_sources/{data_source_id}"
resp = self.client._get(url).json()
Expand Down Expand Up @@ -441,14 +410,6 @@ def _get_database_name_based_on_datasource(

return database_name

def _construct_datalineage_urn(
self, platform: str, database_name: str, sql_table_name: str
) -> str:
full_dataset_name = get_full_qualified_name(
platform, database_name, sql_table_name
)
return builder.make_dataset_urn(platform, full_dataset_name, self.config.env)

def _get_datasource_urns(
self, data_source: Dict, sql_query_data: Dict = {}
) -> Optional[List[str]]:
Expand All @@ -464,34 +425,23 @@ def _get_datasource_urns(
# Getting table lineage from SQL parsing
if self.parse_table_names_from_sql and data_source_syntax == "sql":
dataset_urns = list()
try:
sql_table_names = self._get_sql_table_names(
query, self.sql_parser_path
)
except Exception as e:
sql_parser_in_tables = create_lineage_sql_parsed_result(
query=query,
platform=platform,
env=self.config.env,
platform_instance=None,
default_db=database_name,
)
# make sure dataset_urns is not empty list
dataset_urns = sql_parser_in_tables.in_tables
if sql_parser_in_tables.debug_info.table_error:
self.report.queries_problem_parsing.add(str(query_id))
self.error(
logger,
"sql-parsing",
f"exception {e} in parsing query-{query_id}-datasource-{data_source_id}",
f"exception {sql_parser_in_tables.debug_info.table_error} in parsing query-{query_id}-datasource-{data_source_id}",
)
sql_table_names = []
for sql_table_name in sql_table_names:
try:
dataset_urns.append(
self._construct_datalineage_urn(
platform, database_name, sql_table_name
)
)
except Exception:
self.report.queries_problem_parsing.add(str(query_id))
self.warn(
logger,
"data-urn-invalid",
f"Problem making URN for {sql_table_name} parsed from query {query_id}",
)

# make sure dataset_urns is not empty list
return dataset_urns if len(dataset_urns) > 0 else None

else:
Expand Down
Loading

0 comments on commit 57b12bd

Please sign in to comment.