From 57b12bd9cb9689638a34932b239540981f95fd6d Mon Sep 17 00:00:00 2001
From: sagar-salvi-apptware
<159135491+sagar-salvi-apptware@users.noreply.github.com>
Date: Tue, 10 Dec 2024 22:06:01 +0530
Subject: [PATCH] fix(ingest): replace sqllineage/sqlparse with our SQL parser
(#12020)
---
docs/how/updating-datahub.md | 23 ++-
metadata-ingestion-modules/gx-plugin/setup.py | 12 +-
.../gx-plugin/src/datahub_gx_plugin/action.py | 24 ++-
.../docs/sources/redash/redash.md | 7 +-
metadata-ingestion/setup.py | 23 +--
.../src/datahub/ingestion/source/mode.py | 23 ---
.../src/datahub/ingestion/source/redash.py | 76 ++-------
.../datahub/ingestion/source/unity/usage.py | 31 ++--
.../utilities/sql_lineage_parser_impl.py | 160 ------------------
.../src/datahub/utilities/sql_parser.py | 94 ----------
.../src/datahub/utilities/sql_parser_base.py | 21 ---
.../tests/unit/test_redash_source.py | 6 +-
.../tests/unit/utilities/test_utilities.py | 65 +++++--
13 files changed, 130 insertions(+), 435 deletions(-)
delete mode 100644 metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
delete mode 100644 metadata-ingestion/src/datahub/utilities/sql_parser.py
delete mode 100644 metadata-ingestion/src/datahub/utilities/sql_parser_base.py
diff --git a/docs/how/updating-datahub.md b/docs/how/updating-datahub.md
index bcc89332cc1c1b..d8fe06abad6252 100644
--- a/docs/how/updating-datahub.md
+++ b/docs/how/updating-datahub.md
@@ -19,19 +19,21 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
## Next
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.
PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.
- Entity urn with `include_workspace_name_in_dataset_urn: false`
- ```
- urn:li:dataset:(urn:li:dataPlatform:powerbi,[.].,)
- ```
+ Entity urn with `include_workspace_name_in_dataset_urn: false`
- Entity urn with `include_workspace_name_in_dataset_urn: true`
- ```
- urn:li:dataset:(urn:li:dataPlatform:powerbi,[.]...,)
- ```
+ ```
+ urn:li:dataset:(urn:li:dataPlatform:powerbi,[.].,)
+ ```
+
+ Entity urn with `include_workspace_name_in_dataset_urn: true`
+
+ ```
+ urn:li:dataset:(urn:li:dataPlatform:powerbi,[.]...,)
+ ```
The config `include_workspace_name_in_dataset_urn` is default to `false` for backward compatiblity, However, we recommend enabling this flag after performing the necessary cleanup.
If stateful ingestion is enabled, running ingestion with the latest CLI version will handle the cleanup automatically. Otherwise, we recommend soft deleting all powerbi data via the DataHub CLI:
- `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
+ `datahub delete --platform powerbi --soft` and then re-ingest with the latest CLI version, ensuring the `include_workspace_name_in_dataset_urn` configuration is set to true.
- #11701: The Fivetran `sources_to_database` field is deprecated in favor of setting directly within `sources_to_platform_instance..database`.
- #11742: For PowerBi ingestion, `use_powerbi_email` is now enabled by default when extracting ownership information.
@@ -48,6 +50,9 @@ This file documents any backwards-incompatible changes in DataHub and assists pe
- #11619 - schema field/column paths can no longer be duplicated within the schema
- #11570 - The `DatahubClientConfig`'s server field no longer defaults to `http://localhost:8080`. Be sure to explicitly set this.
- #11570 - If a `datahub_api` is explicitly passed to a stateful ingestion config provider, it will be used. We previously ignored it if the pipeline context also had a graph object.
+- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted (after 10d) or are timeseries _entities_ (dataprocess, execution requests) will be removed automatically using logic in the `datahub-gc` ingestion source.
+- #12020 - Removed `sql_parser` configuration from the Redash source, as Redash now exclusively uses the sqlglot-based parser for lineage extraction.
+- #12020 - Removed `datahub.utilities.sql_parser`, `datahub.utilities.sql_parser_base` and `datahub.utilities.sql_lineage_parser_impl` module along with `SqlLineageSQLParser` and `DefaultSQLParser`. Use `create_lineage_sql_parsed_result` from `datahub.sql_parsing.sqlglot_lineage` module instead.
- #11518 - DataHub Garbage Collection: Various entities that are soft-deleted
(after 10d) or are timeseries *entities* (dataprocess, execution requests)
will be removed automatically using logic in the `datahub-gc` ingestion
diff --git a/metadata-ingestion-modules/gx-plugin/setup.py b/metadata-ingestion-modules/gx-plugin/setup.py
index e87bbded96584e..73d5d1a9a02f18 100644
--- a/metadata-ingestion-modules/gx-plugin/setup.py
+++ b/metadata-ingestion-modules/gx-plugin/setup.py
@@ -15,15 +15,6 @@ def get_long_description():
rest_common = {"requests", "requests_file"}
-# TODO: Can we move away from sqllineage and use sqlglot ??
-sqllineage_lib = {
- "sqllineage==1.3.8",
- # We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
- # There have previously been issues from not pinning sqlparse, so it's best to pin it.
- # Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
- "sqlparse==0.4.4",
-}
-
_version: str = package_metadata["__version__"]
_self_pin = (
f"=={_version}"
@@ -43,8 +34,7 @@ def get_long_description():
# https://github.com/ipython/traitlets/issues/741
"traitlets<5.2.2",
*rest_common,
- *sqllineage_lib,
- f"acryl-datahub[datahub-rest]{_self_pin}",
+ f"acryl-datahub[datahub-rest,sql-parser]{_self_pin}",
}
mypy_stubs = {
diff --git a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py
index 2ad301a38d0028..2d89d26997d1f3 100644
--- a/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py
+++ b/metadata-ingestion-modules/gx-plugin/src/datahub_gx_plugin/action.py
@@ -34,8 +34,9 @@
)
from datahub.metadata.com.linkedin.pegasus2avro.common import DataPlatformInstance
from datahub.metadata.schema_classes import PartitionSpecClass, PartitionTypeClass
+from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities._markupsafe_compat import MARKUPSAFE_PATCHED
-from datahub.utilities.sql_parser import DefaultSQLParser
+from datahub.utilities.urns.dataset_urn import DatasetUrn
from great_expectations.checkpoint.actions import ValidationAction
from great_expectations.core.batch import Batch
from great_expectations.core.batch_spec import (
@@ -677,10 +678,23 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
query=query,
customProperties=batchSpecProperties,
)
- try:
- tables = DefaultSQLParser(query).get_tables()
- except Exception as e:
- logger.warning(f"Sql parser failed on {query} with {e}")
+
+ data_platform = get_platform_from_sqlalchemy_uri(str(sqlalchemy_uri))
+ sql_parser_in_tables = create_lineage_sql_parsed_result(
+ query=query,
+ platform=data_platform,
+ env=self.env,
+ platform_instance=None,
+ default_db=None,
+ )
+ tables = [
+ DatasetUrn.from_string(table_urn).name
+ for table_urn in sql_parser_in_tables.in_tables
+ ]
+ if sql_parser_in_tables.debug_info.table_error:
+ logger.warning(
+ f"Sql parser failed on {query} with {sql_parser_in_tables.debug_info.table_error}"
+ )
tables = []
if len(set(tables)) != 1:
diff --git a/metadata-ingestion/docs/sources/redash/redash.md b/metadata-ingestion/docs/sources/redash/redash.md
index 8f8c5c85496a09..f23a523cebc913 100644
--- a/metadata-ingestion/docs/sources/redash/redash.md
+++ b/metadata-ingestion/docs/sources/redash/redash.md
@@ -1,5 +1,2 @@
-Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
-but can be enabled by setting `parse_table_names_from_sql: true`. The default parser is based on the [`sqllineage`](https://pypi.org/project/sqllineage/) package.
-As this package doesn't officially support all the SQL dialects that Redash supports, the result might not be correct. You can, however, implement a
-custom parser and take it into use by setting the `sql_parser` configuration value. A custom SQL parser must inherit from `datahub.utilities.sql_parser.SQLParser`
-and must be made available to Datahub by ,for example, installing it. The configuration then needs to be set to `module_name.ClassName` of the parser.
+Note! The integration can use an SQL parser to try to parse the tables the chart depends on. This parsing is disabled by default,
+but can be enabled by setting `parse_table_names_from_sql: true`. The parser is based on the [`sqlglot`](https://pypi.org/project/sqlglot/) package.
diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py
index 5ae5438e212c5b..415871d30175f8 100644
--- a/metadata-ingestion/setup.py
+++ b/metadata-ingestion/setup.py
@@ -159,14 +159,6 @@
| classification_lib
)
-sqllineage_lib = {
- "sqllineage==1.3.8",
- # We don't have a direct dependency on sqlparse but it is a dependency of sqllineage.
- # There have previously been issues from not pinning sqlparse, so it's best to pin it.
- # Related: https://github.com/reata/sqllineage/issues/361 and https://github.com/reata/sqllineage/pull/360
- "sqlparse==0.4.4",
-}
-
aws_common = {
# AWS Python SDK
"boto3",
@@ -216,7 +208,6 @@
"sqlalchemy-redshift>=0.8.3",
"GeoAlchemy2",
"redshift-connector>=2.1.0",
- *sqllineage_lib,
*path_spec_common,
}
@@ -464,9 +455,7 @@
# It's technically wrong for packages to depend on setuptools. However, it seems mlflow does it anyways.
"setuptools",
},
- "mode": {"requests", "python-liquid", "tenacity>=8.0.1"}
- | sqllineage_lib
- | sqlglot_lib,
+ "mode": {"requests", "python-liquid", "tenacity>=8.0.1"} | sqlglot_lib,
"mongodb": {"pymongo[srv]>=3.11", "packaging"},
"mssql": sql_common | mssql_common,
"mssql-odbc": sql_common | mssql_common | {"pyodbc"},
@@ -482,7 +471,7 @@
| pyhive_common
| {"psycopg2-binary", "pymysql>=1.0.2"},
"pulsar": {"requests"},
- "redash": {"redash-toolbelt", "sql-metadata"} | sqllineage_lib,
+ "redash": {"redash-toolbelt", "sql-metadata"} | sqlglot_lib,
"redshift": sql_common
| redshift_common
| usage_common
@@ -503,9 +492,7 @@
"slack": slack,
"superset": superset_common,
"preset": superset_common,
- # FIXME: I don't think tableau uses sqllineage anymore so we should be able
- # to remove that dependency.
- "tableau": {"tableauserverclient>=0.24.0"} | sqllineage_lib | sqlglot_lib,
+ "tableau": {"tableauserverclient>=0.24.0"} | sqlglot_lib,
"teradata": sql_common
| usage_common
| sqlglot_lib
@@ -527,9 +514,9 @@
),
"powerbi-report-server": powerbi_report_server,
"vertica": sql_common | {"vertica-sqlalchemy-dialect[vertica-python]==0.0.8.2"},
- "unity-catalog": databricks | sql_common | sqllineage_lib,
+ "unity-catalog": databricks | sql_common,
# databricks is alias for unity-catalog and needs to be kept in sync
- "databricks": databricks | sql_common | sqllineage_lib,
+ "databricks": databricks | sql_common,
"fivetran": snowflake_common | bigquery_common | sqlglot_lib,
"qlik-sense": sqlglot_lib | {"requests", "websocket-client"},
"sigma": sqlglot_lib | {"requests"},
diff --git a/metadata-ingestion/src/datahub/ingestion/source/mode.py b/metadata-ingestion/src/datahub/ingestion/source/mode.py
index e24cba9b193d31..c1ab9271ce13ae 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/mode.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/mode.py
@@ -18,7 +18,6 @@
from requests.adapters import HTTPAdapter, Retry
from requests.exceptions import ConnectionError
from requests.models import HTTPBasicAuth, HTTPError
-from sqllineage.runner import LineageRunner
from tenacity import retry_if_exception_type, stop_after_attempt, wait_exponential
import datahub.emitter.mce_builder as builder
@@ -820,28 +819,6 @@ def _get_definition(self, definition_name):
)
return None
- @lru_cache(maxsize=None)
- def _get_source_from_query(self, raw_query: str) -> set:
- query = self._replace_definitions(raw_query)
- parser = LineageRunner(query)
- source_paths = set()
- try:
- for table in parser.source_tables:
- sources = str(table).split(".")
- source_schema, source_table = sources[-2], sources[-1]
- if source_schema == "":
- source_schema = str(self.config.default_schema)
-
- source_paths.add(f"{source_schema}.{source_table}")
- except Exception as e:
- self.report.report_failure(
- title="Failed to Extract Lineage From Query",
- message="Unable to retrieve lineage from Mode query.",
- context=f"Query: {raw_query}, Error: {str(e)}",
- )
-
- return source_paths
-
def _get_datasource_urn(
self,
platform: str,
diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py
index 581e32d29dceaf..f11d1944029ebb 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/redash.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py
@@ -2,7 +2,7 @@
import math
import sys
from dataclasses import dataclass, field
-from typing import Dict, Iterable, List, Optional, Set, Type
+from typing import Dict, Iterable, List, Optional, Set
import dateutil.parser as dp
from packaging import version
@@ -22,7 +22,6 @@
platform_name,
support_status,
)
-from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.source import Source, SourceCapability, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.common import (
@@ -39,9 +38,9 @@
ChartTypeClass,
DashboardInfoClass,
)
+from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer
-from datahub.utilities.sql_parser_base import SQLParser
from datahub.utilities.threaded_iterator_executor import ThreadedIteratorExecutor
logger = logging.getLogger(__name__)
@@ -270,10 +269,6 @@ class RedashConfig(ConfigModel):
parse_table_names_from_sql: bool = Field(
default=False, description="See note below."
)
- sql_parser: str = Field(
- default="datahub.utilities.sql_parser.DefaultSQLParser",
- description="custom SQL parser. See note below for details.",
- )
env: str = Field(
default=DEFAULT_ENV,
@@ -354,7 +349,6 @@ def __init__(self, ctx: PipelineContext, config: RedashConfig):
self.api_page_limit = self.config.api_page_limit or math.inf
self.parse_table_names_from_sql = self.config.parse_table_names_from_sql
- self.sql_parser_path = self.config.sql_parser
logger.info(
f"Running Redash ingestion with parse_table_names_from_sql={self.parse_table_names_from_sql}"
@@ -380,31 +374,6 @@ def create(cls, config_dict: dict, ctx: PipelineContext) -> Source:
config = RedashConfig.parse_obj(config_dict)
return cls(ctx, config)
- @classmethod
- def _import_sql_parser_cls(cls, sql_parser_path: str) -> Type[SQLParser]:
- assert "." in sql_parser_path, "sql_parser-path must contain a ."
- parser_cls = import_path(sql_parser_path)
-
- if not issubclass(parser_cls, SQLParser):
- raise ValueError(f"must be derived from {SQLParser}; got {parser_cls}")
- return parser_cls
-
- @classmethod
- def _get_sql_table_names(cls, sql: str, sql_parser_path: str) -> List[str]:
- parser_cls = cls._import_sql_parser_cls(sql_parser_path)
-
- try:
- sql_table_names: List[str] = parser_cls(sql).get_tables()
- except Exception as e:
- logger.warning(f"Sql parser failed on {sql} with {e}")
- return []
-
- # Remove quotes from table names
- sql_table_names = [t.replace('"', "") for t in sql_table_names]
- sql_table_names = [t.replace("`", "") for t in sql_table_names]
-
- return sql_table_names
-
def _get_chart_data_source(self, data_source_id: Optional[int] = None) -> Dict:
url = f"/api/data_sources/{data_source_id}"
resp = self.client._get(url).json()
@@ -441,14 +410,6 @@ def _get_database_name_based_on_datasource(
return database_name
- def _construct_datalineage_urn(
- self, platform: str, database_name: str, sql_table_name: str
- ) -> str:
- full_dataset_name = get_full_qualified_name(
- platform, database_name, sql_table_name
- )
- return builder.make_dataset_urn(platform, full_dataset_name, self.config.env)
-
def _get_datasource_urns(
self, data_source: Dict, sql_query_data: Dict = {}
) -> Optional[List[str]]:
@@ -464,34 +425,23 @@ def _get_datasource_urns(
# Getting table lineage from SQL parsing
if self.parse_table_names_from_sql and data_source_syntax == "sql":
dataset_urns = list()
- try:
- sql_table_names = self._get_sql_table_names(
- query, self.sql_parser_path
- )
- except Exception as e:
+ sql_parser_in_tables = create_lineage_sql_parsed_result(
+ query=query,
+ platform=platform,
+ env=self.config.env,
+ platform_instance=None,
+ default_db=database_name,
+ )
+ # make sure dataset_urns is not empty list
+ dataset_urns = sql_parser_in_tables.in_tables
+ if sql_parser_in_tables.debug_info.table_error:
self.report.queries_problem_parsing.add(str(query_id))
self.error(
logger,
"sql-parsing",
- f"exception {e} in parsing query-{query_id}-datasource-{data_source_id}",
+ f"exception {sql_parser_in_tables.debug_info.table_error} in parsing query-{query_id}-datasource-{data_source_id}",
)
- sql_table_names = []
- for sql_table_name in sql_table_names:
- try:
- dataset_urns.append(
- self._construct_datalineage_urn(
- platform, database_name, sql_table_name
- )
- )
- except Exception:
- self.report.queries_problem_parsing.add(str(query_id))
- self.warn(
- logger,
- "data-urn-invalid",
- f"Problem making URN for {sql_table_name} parsed from query {query_id}",
- )
- # make sure dataset_urns is not empty list
return dataset_urns if len(dataset_urns) > 0 else None
else:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
index 8c42ac81b98cf5..718818d9b347bf 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
@@ -7,7 +7,6 @@
import pyspark
from databricks.sdk.service.sql import QueryStatementType
-from sqllineage.runner import LineageRunner
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source_helpers import auto_empty_dataset_usage_statistics
@@ -22,7 +21,9 @@
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.ingestion.source.usage.usage_common import UsageAggregator
from datahub.metadata.schema_classes import OperationClass
+from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result
from datahub.sql_parsing.sqlglot_utils import get_query_fingerprint
+from datahub.utilities.urns.dataset_urn import DatasetUrn
logger = logging.getLogger(__name__)
@@ -48,6 +49,7 @@ class UnityCatalogUsageExtractor:
proxy: UnityCatalogApiProxy
table_urn_builder: Callable[[TableReference], str]
user_urn_builder: Callable[[str], str]
+ platform: str = "databricks"
def __post_init__(self):
self.usage_aggregator = UsageAggregator[TableReference](self.config)
@@ -173,7 +175,7 @@ def _parse_query(
self, query: Query, table_map: TableMap
) -> Optional[QueryTableInfo]:
with self.report.usage_perf_report.sql_parsing_timer:
- table_info = self._parse_query_via_lineage_runner(query.query_text)
+ table_info = self._parse_query_via_sqlglot(query.query_text)
if table_info is None and query.statement_type == QueryStatementType.SELECT:
with self.report.usage_perf_report.spark_sql_parsing_timer:
table_info = self._parse_query_via_spark_sql_plan(query.query_text)
@@ -191,26 +193,33 @@ def _parse_query(
),
)
- def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]:
+ def _parse_query_via_sqlglot(self, query: str) -> Optional[StringTableInfo]:
try:
- runner = LineageRunner(query)
+ sql_parser_in_tables = create_lineage_sql_parsed_result(
+ query=query,
+ default_db=None,
+ platform=self.platform,
+ env=self.config.env,
+ platform_instance=None,
+ )
+
return GenericTableInfo(
source_tables=[
- self._parse_sqllineage_table(table)
- for table in runner.source_tables
+ self._parse_sqlglot_table(table)
+ for table in sql_parser_in_tables.in_tables
],
target_tables=[
- self._parse_sqllineage_table(table)
- for table in runner.target_tables
+ self._parse_sqlglot_table(table)
+ for table in sql_parser_in_tables.out_tables
],
)
except Exception as e:
- logger.info(f"Could not parse query via lineage runner, {query}: {e!r}")
+ logger.info(f"Could not parse query via sqlglot, {query}: {e!r}")
return None
@staticmethod
- def _parse_sqllineage_table(sqllineage_table: object) -> str:
- full_table_name = str(sqllineage_table)
+ def _parse_sqlglot_table(table_urn: str) -> str:
+ full_table_name = DatasetUrn.from_string(table_urn).name
default_schema = "."
if full_table_name.startswith(default_schema):
return full_table_name[len(default_schema) :]
diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
deleted file mode 100644
index 5a8802c7a0a49c..00000000000000
--- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
+++ /dev/null
@@ -1,160 +0,0 @@
-import contextlib
-import logging
-import re
-import unittest
-import unittest.mock
-from typing import Dict, List, Optional, Set
-
-from sqllineage.core.holders import Column, SQLLineageHolder
-from sqllineage.exceptions import SQLLineageException
-
-from datahub.utilities.sql_parser_base import SQLParser, SqlParserException
-
-with contextlib.suppress(ImportError):
- import sqlparse
- from networkx import DiGraph
- from sqllineage.core import LineageAnalyzer
-
- import datahub.utilities.sqllineage_patch
-logger = logging.getLogger(__name__)
-
-
-class SqlLineageSQLParserImpl(SQLParser):
- _DATE_SWAP_TOKEN = "__d_a_t_e"
- _HOUR_SWAP_TOKEN = "__h_o_u_r"
- _TIMESTAMP_SWAP_TOKEN = "__t_i_m_e_s_t_a_m_p"
- _DATA_SWAP_TOKEN = "__d_a_t_a"
- _ADMIN_SWAP_TOKEN = "__a_d_m_i_n"
- _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
- _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"
-
- def __init__(self, sql_query: str, use_raw_names: bool = False) -> None:
- super().__init__(sql_query)
- original_sql_query = sql_query
- self._use_raw_names = use_raw_names
-
- # SqlLineageParser makes mistakes on lateral flatten queries, use the prefix
- if "lateral flatten" in sql_query:
- sql_query = sql_query[: sql_query.find("lateral flatten")]
-
- # Replace reserved words that break SqlLineageParser
- self.token_to_original: Dict[str, str] = {
- self._DATE_SWAP_TOKEN: "date",
- self._HOUR_SWAP_TOKEN: "hour",
- self._TIMESTAMP_SWAP_TOKEN: "timestamp",
- self._DATA_SWAP_TOKEN: "data",
- self._ADMIN_SWAP_TOKEN: "admin",
- }
- for replacement, original in self.token_to_original.items():
- # Replace original tokens with replacement. Since table and column name can contain a hyphen('-'),
- # also prevent original tokens appearing as part of these names with a hyphen from getting substituted.
- sql_query = re.sub(
- rf"((? List[str]:
- result: List[str] = []
- if self._sql_holder is None:
- logger.error("sql holder not present so cannot get tables")
- return result
- for table in self._sql_holder.source_tables:
- table_normalized = re.sub(
- r"^.",
- "",
- (
- str(table)
- if not self._use_raw_names
- else f"{table.schema.raw_name}.{table.raw_name}"
- ),
- )
- result.append(str(table_normalized))
-
- # We need to revert TOKEN replacements
- for token, replacement in self.token_to_original.items():
- result = [replacement if c == token else c for c in result]
- result = [
- self._MYVIEW_LOOKER_TOKEN if c == self._MYVIEW_SQL_TABLE_NAME_TOKEN else c
- for c in result
- ]
-
- # Sort tables to make the list deterministic
- result.sort()
-
- return result
-
- def get_columns(self) -> List[str]:
- if self._sql_holder is None:
- raise SqlParserException("sql holder not present so cannot get columns")
- graph: DiGraph = self._sql_holder.graph # For mypy attribute checking
- column_nodes = [n for n in graph.nodes if isinstance(n, Column)]
- column_graph = graph.subgraph(column_nodes)
-
- target_columns = {column for column, deg in column_graph.out_degree if deg == 0}
-
- result: Set[str] = set()
- for column in target_columns:
- # Let's drop all the count(*) and similard columns which are expression actually if it does not have an alias
- if not any(ele in column.raw_name for ele in ["*", "(", ")"]):
- result.add(str(column.raw_name))
-
- # Reverting back all the previously renamed words which confuses the parser
- result = {"date" if c == self._DATE_SWAP_TOKEN else c for c in result}
- result = {
- "timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c for c in list(result)
- }
-
- # swap back renamed date column
- return list(result)
diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py
deleted file mode 100644
index b88f8fd8c73029..00000000000000
--- a/metadata-ingestion/src/datahub/utilities/sql_parser.py
+++ /dev/null
@@ -1,94 +0,0 @@
-import logging
-import multiprocessing
-import traceback
-from multiprocessing import Process, Queue
-from typing import Any, List, Optional, Tuple
-
-from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
-from datahub.utilities.sql_parser_base import SQLParser
-
-logger = logging.getLogger(__name__)
-
-
-def sql_lineage_parser_impl_func_wrapper(
- queue: Optional[multiprocessing.Queue], sql_query: str, use_raw_names: bool = False
-) -> Optional[Tuple[List[str], List[str], Any]]:
- """
- The wrapper function that computes the tables and columns using the SqlLineageSQLParserImpl
- and puts the results on the shared IPC queue. This is used to isolate SqlLineageSQLParserImpl
- functionality in a separate process, and hence protect our sources from memory leaks originating in
- the sqllineage module.
- :param queue: The shared IPC queue on to which the results will be put.
- :param sql_query: The SQL query to extract the tables & columns from.
- :param use_raw_names: Parameter used to ignore sqllineage's default lowercasing.
- :return: None.
- """
- exception_details: Optional[Tuple[BaseException, str]] = None
- tables: List[str] = []
- columns: List[str] = []
- try:
- parser = SqlLineageSQLParserImpl(sql_query, use_raw_names)
- tables = parser.get_tables()
- columns = parser.get_columns()
- except BaseException as e:
- exc_msg = traceback.format_exc()
- exception_details = (e, exc_msg)
- logger.debug(exc_msg)
-
- if queue is not None:
- queue.put((tables, columns, exception_details))
- return None
- else:
- return (tables, columns, exception_details)
-
-
-class SqlLineageSQLParser(SQLParser):
- def __init__(
- self,
- sql_query: str,
- use_external_process: bool = False,
- use_raw_names: bool = False,
- ) -> None:
- super().__init__(sql_query, use_external_process)
- if use_external_process:
- self.tables, self.columns = self._get_tables_columns_process_wrapped(
- sql_query, use_raw_names
- )
- else:
- return_tuple = sql_lineage_parser_impl_func_wrapper(
- None, sql_query, use_raw_names
- )
- if return_tuple is not None:
- (
- self.tables,
- self.columns,
- some_exception,
- ) = return_tuple
-
- @staticmethod
- def _get_tables_columns_process_wrapped(
- sql_query: str, use_raw_names: bool = False
- ) -> Tuple[List[str], List[str]]:
- # Invoke sql_lineage_parser_impl_func_wrapper in a separate process to avoid
- # memory leaks from sqllineage module used by SqlLineageSQLParserImpl. This will help
- # shield our sources like lookml & redash, that need to parse a large number of SQL statements,
- # from causing significant memory leaks in the datahub cli during ingestion.
- queue: multiprocessing.Queue = Queue()
- process: multiprocessing.Process = Process(
- target=sql_lineage_parser_impl_func_wrapper,
- args=(queue, sql_query, use_raw_names),
- )
- process.start()
- tables, columns, exception_details = queue.get(block=True)
- if exception_details is not None:
- raise exception_details[0](f"Sub-process exception: {exception_details[1]}")
- return tables, columns
-
- def get_tables(self) -> List[str]:
- return self.tables
-
- def get_columns(self) -> List[str]:
- return self.columns
-
-
-DefaultSQLParser = SqlLineageSQLParser
diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser_base.py b/metadata-ingestion/src/datahub/utilities/sql_parser_base.py
deleted file mode 100644
index 8fd5dfaf4978d1..00000000000000
--- a/metadata-ingestion/src/datahub/utilities/sql_parser_base.py
+++ /dev/null
@@ -1,21 +0,0 @@
-from abc import ABCMeta, abstractmethod
-from typing import List
-
-
-class SqlParserException(Exception):
- """Raised when sql parser fails"""
-
- pass
-
-
-class SQLParser(metaclass=ABCMeta):
- def __init__(self, sql_query: str, use_external_process: bool = True) -> None:
- self._sql_query = sql_query
-
- @abstractmethod
- def get_tables(self) -> List[str]:
- pass
-
- @abstractmethod
- def get_columns(self) -> List[str]:
- pass
diff --git a/metadata-ingestion/tests/unit/test_redash_source.py b/metadata-ingestion/tests/unit/test_redash_source.py
index 2982fe76c4d4e7..32ab200847dc6c 100644
--- a/metadata-ingestion/tests/unit/test_redash_source.py
+++ b/metadata-ingestion/tests/unit/test_redash_source.py
@@ -710,9 +710,9 @@ def test_get_chart_snapshot_parse_table_names_from_sql(mocked_data_source):
),
chartUrl="http://localhost:5000/queries/4#10",
inputs=[
- "urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.order_items,PROD)",
- "urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.orders,PROD)",
- "urn:li:dataset:(urn:li:dataPlatform:mysql,Rfam.staffs,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.order_items,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.orders,PROD)",
+ "urn:li:dataset:(urn:li:dataPlatform:mysql,rfam.staffs,PROD)",
],
type="PIE",
)
diff --git a/metadata-ingestion/tests/unit/utilities/test_utilities.py b/metadata-ingestion/tests/unit/utilities/test_utilities.py
index 68da1bc1c01be2..91819bff41e629 100644
--- a/metadata-ingestion/tests/unit/utilities/test_utilities.py
+++ b/metadata-ingestion/tests/unit/utilities/test_utilities.py
@@ -1,8 +1,55 @@
import doctest
+import re
+from typing import List
+from datahub.sql_parsing.schema_resolver import SchemaResolver
+from datahub.sql_parsing.sqlglot_lineage import sqlglot_lineage
from datahub.utilities.delayed_iter import delayed_iter
from datahub.utilities.is_pytest import is_pytest_running
-from datahub.utilities.sql_parser import SqlLineageSQLParser
+from datahub.utilities.urns.dataset_urn import DatasetUrn
+
+
+class SqlLineageSQLParser:
+ """
+ It uses `sqlglot_lineage` to extract tables and columns, serving as a replacement for the `sqllineage` implementation, similar to BigQuery.
+ Reference: [BigQuery SQL Lineage Test](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/tests/unit/bigquery/test_bigquery_sql_lineage.py#L8).
+ """
+
+ _MYVIEW_SQL_TABLE_NAME_TOKEN = "__my_view__.__sql_table_name__"
+ _MYVIEW_LOOKER_TOKEN = "my_view.SQL_TABLE_NAME"
+
+ def __init__(self, sql_query: str, platform: str = "bigquery") -> None:
+ # SqlLineageParser lowercarese tablenames and we need to replace Looker specific token which should be uppercased
+ sql_query = re.sub(
+ rf"(\${{{self._MYVIEW_LOOKER_TOKEN}}})",
+ rf"{self._MYVIEW_SQL_TABLE_NAME_TOKEN}",
+ sql_query,
+ )
+ self.sql_query = sql_query
+ self.schema_resolver = SchemaResolver(platform=platform)
+ self.result = sqlglot_lineage(sql_query, self.schema_resolver)
+
+ def get_tables(self) -> List[str]:
+ ans = []
+ for urn in self.result.in_tables:
+ table_ref = DatasetUrn.from_string(urn)
+ ans.append(str(table_ref.name))
+
+ result = [
+ self._MYVIEW_LOOKER_TOKEN if c == self._MYVIEW_SQL_TABLE_NAME_TOKEN else c
+ for c in ans
+ ]
+ # Sort tables to make the list deterministic
+ result.sort()
+
+ return result
+
+ def get_columns(self) -> List[str]:
+ ans = []
+ for col_info in self.result.column_lineage or []:
+ for col_ref in col_info.upstreams:
+ ans.append(col_ref.column)
+ return ans
def test_delayed_iter():
@@ -121,7 +168,7 @@ def test_sqllineage_sql_parser_get_columns_with_alias_and_count_star():
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
- assert columns_list == ["a", "b", "count", "test"]
+ assert columns_list == ["a", "b", "c"]
def test_sqllineage_sql_parser_get_columns_with_more_complex_join():
@@ -145,7 +192,7 @@ def test_sqllineage_sql_parser_get_columns_with_more_complex_join():
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
- assert columns_list == ["bs", "pi", "pt", "pu", "v"]
+ assert columns_list == ["bs", "pi", "tt", "tt", "v"]
def test_sqllineage_sql_parser_get_columns_complex_query_with_union():
@@ -198,7 +245,7 @@ def test_sqllineage_sql_parser_get_columns_complex_query_with_union():
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
- assert columns_list == ["c", "date", "e", "u", "x"]
+ assert columns_list == ["c", "c", "e", "e", "e", "e", "u", "u", "x", "x"]
def test_sqllineage_sql_parser_get_tables_from_templated_query():
@@ -239,7 +286,7 @@ def test_sqllineage_sql_parser_with_weird_lookml_query():
"""
columns_list = SqlLineageSQLParser(sql_query).get_columns()
columns_list.sort()
- assert columns_list == ["aliased_platform", "country", "date"]
+ assert columns_list == []
def test_sqllineage_sql_parser_tables_from_redash_query():
@@ -276,13 +323,7 @@ def test_sqllineage_sql_parser_tables_with_special_names():
"hour-table",
"timestamp-table",
]
- expected_columns = [
- "column-admin",
- "column-data",
- "column-date",
- "column-hour",
- "column-timestamp",
- ]
+ expected_columns: List[str] = []
assert sorted(SqlLineageSQLParser(sql_query).get_tables()) == expected_tables
assert sorted(SqlLineageSQLParser(sql_query).get_columns()) == expected_columns