Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[sc-27604] Parse TLL for UnityCatalog query logs #919

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ repos:
language: system
pass_filenames: false
always_run: true

- repo: https://github.com/python-poetry/poetry
rev: 1.8.3
hooks:
- id: poetry-lock
args: [--no-update]
- id: poetry-export
args: [--all-extras, --output=requirements.txt]
usefulalgorithm marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ RUN apt-get clean
RUN apt-get update
RUN apt-get install -y git build-essential libsasl2-dev

COPY ./requirements.txt /src/requirements.txt

RUN pip install -r /src/requirements.txt --no-deps

RUN rm -rf /src

COPY . /src

RUN pip install '/src[all]'
Expand Down
7 changes: 0 additions & 7 deletions metaphor/common/process_query/__init__.py

This file was deleted.

5 changes: 0 additions & 5 deletions metaphor/common/process_query/preprocess/__init__.py

This file was deleted.

File renamed without changes.
15 changes: 15 additions & 0 deletions metaphor/common/sql/dialect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from metaphor.models.metadata_change_event import DataPlatform

PLATFORM_TO_DIALECT = {
DataPlatform.BIGQUERY: "bigquery",
DataPlatform.HIVE: "hive",
DataPlatform.MSSQL: "tsql",
DataPlatform.MYSQL: "mysql",
DataPlatform.POSTGRESQL: "postgres",
DataPlatform.REDSHIFT: "redshift",
DataPlatform.SNOWFLAKE: "snowflake",
DataPlatform.UNITY_CATALOG: "databricks",
}
"""
Mapping from data platforms supported by Metaphor to dialects recognized by SQLGlot.
"""
7 changes: 7 additions & 0 deletions metaphor/common/sql/process_query/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from metaphor.common.sql.process_query.config import ProcessQueryConfig
from metaphor.common.sql.process_query.process_query import process_query

__all__ = [
"process_query",
"ProcessQueryConfig",
]
5 changes: 5 additions & 0 deletions metaphor/common/sql/process_query/preprocess/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from metaphor.common.sql.process_query.preprocess.preprocess import preprocess

__all__ = [
"preprocess",
]
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from metaphor.common.process_query.preprocess.snowflake import (
from metaphor.common.sql.process_query.preprocess.snowflake import (
drop_snowflake_copy_into_options,
)
from metaphor.models.metadata_change_event import DataPlatform
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,14 @@
from sqlglot.errors import SqlglotError

from metaphor.common.logger import get_logger
from metaphor.common.process_query.bad_queries import is_bad_query_pattern
from metaphor.common.process_query.config import ProcessQueryConfig
from metaphor.common.process_query.preprocess import preprocess
from metaphor.common.sql.dialect import PLATFORM_TO_DIALECT
from metaphor.common.sql.process_query.bad_queries import is_bad_query_pattern
from metaphor.common.sql.process_query.config import ProcessQueryConfig
from metaphor.common.sql.process_query.preprocess import preprocess
from metaphor.models.metadata_change_event import DataPlatform

logger = get_logger()

_PLATFORM_TO_DIALECT = {
DataPlatform.BIGQUERY: "bigquery",
DataPlatform.HIVE: "hive",
DataPlatform.MSSQL: "tsql",
DataPlatform.MYSQL: "mysql",
DataPlatform.POSTGRESQL: "postgres",
DataPlatform.REDSHIFT: "redshift",
DataPlatform.SNOWFLAKE: "snowflake",
DataPlatform.UNITY_CATALOG: "databricks",
}


def _redact_literal_values_in_where_clauses(
expression: Expression, config: ProcessQueryConfig
Expand Down Expand Up @@ -85,7 +75,7 @@ def process_query(
if is_bad_query_pattern(sql):
return sql

dialect = _PLATFORM_TO_DIALECT.get(data_platform)
dialect = PLATFORM_TO_DIALECT.get(data_platform)

try:
updated = preprocess(sql, data_platform)
Expand Down
5 changes: 5 additions & 0 deletions metaphor/common/sql/table_level_lineage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .table_level_lineage import extract_table_level_lineage

__all__ = [
"extract_table_level_lineage",
]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import Optional, Set

from sqlglot import Expression, exp

from metaphor.common.sql.table_level_lineage.table import Table


def _handle_create_like(
expression: Expression,
) -> Optional[Set[Table]]:
if not isinstance(expression, exp.Create):
return None
if "properties" not in expression.args:
return None

Check warning on line 14 in metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py#L14

Added line #L14 was not covered by tests
props = expression.args["properties"]
if not isinstance(props, exp.Properties):
return None
if not len(props.expressions) == 1:
return None
prop = props.expressions[0]
if not isinstance(prop, exp.LikeProperty) or not isinstance(prop.this, exp.Table):
return None

Check warning on line 22 in metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py#L22

Added line #L22 was not covered by tests
return {Table.from_sqlglot_table(prop.this)}


def _handle_create_clone(
expression: Expression,
) -> Optional[Set[Table]]:
if not isinstance(expression, exp.Create):
return None
if "clone" not in expression.args:
return None

Check warning on line 32 in metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py#L32

Added line #L32 was not covered by tests
clone = expression.args["clone"]
if not isinstance(clone, exp.Clone):
return None
if not isinstance(clone.this, exp.Table):
return None

Check warning on line 37 in metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py#L37

Added line #L37 was not covered by tests
return {Table.from_sqlglot_table(clone.this)}


def find_target_in_select_into(
expression: Expression,
) -> Set[Table]:
res: Set[Table] = set()
for select in expression.find_all(exp.Select):
if "into" not in select.args:
continue
into = select.args["into"]
if not isinstance(into, exp.Into) or not isinstance(into.this, exp.Table):
continue

Check warning on line 50 in metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/expression_handlers.py#L50

Added line #L50 was not covered by tests
res.add(Table.from_sqlglot_table(into.this))
return res


expression_handlers = [
_handle_create_clone,
_handle_create_like,
]
"""
Some expressions don't have SELECT in them but they still have
lineage that we want to know. We parse these expressions separately.
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from typing import List, Set

import sqlglot.lineage
import sqlglot.optimizer.qualify
import sqlglot.optimizer.scope
from sqlglot import exp

from metaphor.common.logger import get_logger

logger = get_logger()


def _extract_select_from_insert(
statement: exp.Insert,
) -> exp.Expression:
if isinstance(statement.expression, exp.Values):
# For INSERT INTO ... VALUES ..., just join all the
# subqueries into a big select since this is just for
# TLL
select_statement = exp.Select().select("*")
subqueries: List[exp.Subquery] = []
values = statement.expression
for tup in values.expressions:
if isinstance(tup, exp.Tuple):
for expr in tup.expressions:
if isinstance(expr, exp.Subquery):
subqueries.append(expr)
for i, subquery in enumerate(subqueries):
if not i:
select_statement = select_statement.from_(
subquery.as_(f"_subquery_{i}")
)
else:
select_statement = select_statement.join(subquery.as_(f"_subquery_{i}"))
return select_statement

if not isinstance(statement.expression, exp.Select):
logger.warning(
f"SELECT not directly in INSERT INTO, instead found {statement.expression.sql()}"
)
return statement.expression


_UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT: Set[str] = set(exp.Update.arg_types.keys()) - set(
exp.Select.arg_types.keys()
)
_UPDATE_FROM_TABLE_ARGS_TO_MOVE = {"joins", "laterals", "pivot"}


def _extract_select_from_update(
statement: exp.Update,
) -> exp.Select:
statement = statement.copy()

# The "SET" expressions need to be converted.
# For the update command, it'll be a list of EQ expressions, but the select
# should contain aliased columns.
new_expressions = []
for expr in statement.expressions:
if isinstance(expr, exp.EQ) and isinstance(expr.left, exp.Column):
new_expressions.append(
exp.Alias(
this=expr.right,
alias=expr.left.this,
)
)
else:
# If we don't know how to convert it, just leave it as-is. If this causes issues,
# they'll get caught later.
new_expressions.append(expr)

Check warning on line 70 in metaphor/common/sql/table_level_lineage/helpers/find_select_in_expression.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/find_select_in_expression.py#L70

Added line #L70 was not covered by tests

# Special translation for the `from` clause.
extra_args = {}
original_from = statement.args.get("from")
if original_from and isinstance(original_from.this, exp.Table):
# Move joins, laterals, and pivots from the Update->From->Table->field
# to the top-level Select->field.

for k in _UPDATE_FROM_TABLE_ARGS_TO_MOVE:
if k in original_from.this.args:
# Mutate the from table clause in-place.
extra_args[k] = original_from.this.args.pop(k)

Check warning on line 82 in metaphor/common/sql/table_level_lineage/helpers/find_select_in_expression.py

View check run for this annotation

Codecov / codecov/patch

metaphor/common/sql/table_level_lineage/helpers/find_select_in_expression.py#L82

Added line #L82 was not covered by tests

select_statement = exp.Select(
**{
**{
k: v
for k, v in statement.args.items()
if k not in _UPDATE_ARGS_NOT_SUPPORTED_BY_SELECT
},
**extra_args,
"expressions": new_expressions,
}
)

# Update statements always implicitly have the updated table in context.
# TODO: Retain table name alias, if one was present.
if select_statement.args.get("from"):
select_statement = select_statement.join(
statement.this, append=True, join_kind="cross"
)
else:
select_statement = select_statement.from_(statement.this)

return select_statement


def _extract_select_from_create(
statement: exp.Create,
) -> exp.Expression:
# TODO: Validate that this properly includes WITH clauses in all dialects.
inner = statement.expression

if inner:
return inner
else:
return statement


def find_select_in_expression(expression: sqlglot.Expression) -> sqlglot.Expression:
# Try to extract the core select logic from a more complex statement.
# If it fails, just return the original statement.

if isinstance(expression, exp.Merge):
# TODO Need to map column renames in the expressions part of the statement.
# Likely need to use the named_selects attr.
expression = expression.args["using"]
if isinstance(expression, exp.Table):
# If we're querying a table directly, wrap it in a SELECT.
expression = exp.Select().select("*").from_(expression)
elif isinstance(expression, exp.Insert):
# TODO Need to map column renames in the expressions part of the statement.
expression = _extract_select_from_insert(expression)
elif isinstance(expression, exp.Update):
# Assumption: the output table is already captured in the modified tables list.
expression = _extract_select_from_update(expression)
elif isinstance(expression, exp.Create):
# TODO May need to map column renames.
# Assumption: the output table is already captured in the modified tables list.
expression = _extract_select_from_create(expression)

if isinstance(expression, exp.Subquery):
expression = expression.unnest()

return expression
12 changes: 12 additions & 0 deletions metaphor/common/sql/table_level_lineage/result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from dataclasses import field
from typing import List

from pydantic.dataclasses import dataclass

from metaphor.models.metadata_change_event import QueriedDataset


@dataclass
class Result:
targets: List[QueriedDataset] = field(default_factory=list)
sources: List[QueriedDataset] = field(default_factory=list)
36 changes: 36 additions & 0 deletions metaphor/common/sql/table_level_lineage/table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional

from pydantic.dataclasses import dataclass
from sqlglot import exp

from metaphor.common.entity_id import dataset_normalized_name, to_dataset_entity_id
from metaphor.models.metadata_change_event import DataPlatform, QueriedDataset


@dataclass(frozen=True)
class Table:
db: Optional[str]
schema: Optional[str]
table: str

@classmethod
def from_sqlglot_table(cls, table: exp.Table):
return cls(
db=table.catalog,
schema=table.db,
table=table.name,
)

def to_queried_dataset(self, platform: DataPlatform, account: Optional[str]):
return QueriedDataset(
database=self.db,
schema=self.schema,
table=self.table,
id=str(
to_dataset_entity_id(
dataset_normalized_name(self.db, self.schema, self.table),
platform,
account,
)
),
)
Loading
Loading