From 511866eb0cb037067f685b5d96e7fe310d167c8e Mon Sep 17 00:00:00 2001 From: Mars Lan Date: Tue, 17 Oct 2023 05:59:35 -0700 Subject: [PATCH] Fix Unity Catalog parsing erros due to permission issues (#634) --- metaphor/unity_catalog/extractor.py | 26 ++++++++++++++++++++++--- metaphor/unity_catalog/models.py | 28 +++++++++++++++++++++------ metaphor/unity_catalog/utils.py | 23 ++++++++++------------ poetry.lock | 23 +++++++++++----------- pyproject.toml | 4 ++-- tests/unity_catalog/test_extractor.py | 14 +++++++++++--- 6 files changed, 80 insertions(+), 38 deletions(-) diff --git a/metaphor/unity_catalog/extractor.py b/metaphor/unity_catalog/extractor.py index 544bf2ee..16da37bd 100644 --- a/metaphor/unity_catalog/extractor.py +++ b/metaphor/unity_catalog/extractor.py @@ -11,7 +11,7 @@ ) from metaphor.common.event_util import ENTITY_TYPES from metaphor.common.filter import DatasetFilter -from metaphor.common.logger import get_logger +from metaphor.common.logger import get_logger, json_dump_to_debug_file from metaphor.common.utils import unique_list from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( @@ -30,7 +30,12 @@ SQLSchema, ) from metaphor.unity_catalog.config import UnityCatalogRunConfig -from metaphor.unity_catalog.models import Table, TableType, parse_table_from_object +from metaphor.unity_catalog.models import ( + NoPermission, + Table, + TableType, + parse_table_from_object, +) from metaphor.unity_catalog.utils import list_column_lineage, list_table_lineage logger = get_logger() @@ -97,6 +102,8 @@ async def extract(self) -> Collection[ENTITY_TYPES]: def _get_catalogs(self) -> List[str]: response = self._api.list_catalogs() + json_dump_to_debug_file(response, "list-catalogs.json") + catalogs = [] for catalog in response.get("catalogs", []): if "name" in catalog: @@ -105,6 +112,8 @@ def _get_catalogs(self) -> List[str]: def _get_schemas(self, catalog: str) -> List[str]: response = self._api.list_schemas(catalog_name=catalog, name_pattern=None) + json_dump_to_debug_file(response, f"list-schemas-{catalog}.json") + schemas = [] for schema in response.get("schemas", []): if "name" in schema: @@ -115,6 +124,7 @@ def _get_tables(self, catalog: str, schema: str) -> Generator[Table, None, None] response = self._api.list_tables( catalog_name=catalog, schema_name=schema, name_pattern=None ) + json_dump_to_debug_file(response, f"list-tables-{catalog}-{schema}.json") for table in response.get("tables", []): yield parse_table_from_object(table) @@ -168,11 +178,12 @@ def _populate_lineage(self, dataset: Dataset): # Skip table without upstream if not lineage.upstreams: - logging.info(f"Skip table: {table_name} for no upstream") + logging.info(f"Table {table_name} has no upstream") return source_datasets = [] field_mappings = [] + has_permission_issues = False for field in dataset.schema.fields: column_name = field.field_name column_lineage = list_column_lineage( @@ -181,6 +192,10 @@ def _populate_lineage(self, dataset: Dataset): field_mapping = FieldMapping(destination=column_name, sources=[]) for upstream_col in column_lineage.upstream_cols: + if isinstance(upstream_col, NoPermission): + has_permission_issues = True + continue + normalized_name = dataset_normalized_name( upstream_col.catalog_name, upstream_col.schema_name, @@ -202,6 +217,11 @@ def _populate_lineage(self, dataset: Dataset): ) field_mappings.append(field_mapping) + if has_permission_issues: + logger.error( + f"Unable to extract lineage for {table_name} due to permission issues" + ) + dataset.upstream = DatasetUpstream( source_datasets=unique_list(source_datasets), field_mappings=field_mappings ) diff --git a/metaphor/unity_catalog/models.py b/metaphor/unity_catalog/models.py index 293ca68e..cf865035 100644 --- a/metaphor/unity_catalog/models.py +++ b/metaphor/unity_catalog/models.py @@ -1,8 +1,8 @@ import json from enum import Enum -from typing import List, Optional +from typing import List, Optional, Union -from pydantic import BaseModel, parse_obj_as +from pydantic import BaseModel, parse_obj_as, validator from metaphor.common.logger import get_logger from metaphor.models.metadata_change_event import CustomMetadataItem @@ -71,10 +71,20 @@ def extra_metadata(self) -> List[CustomMetadataItem]: def parse_table_from_object(obj: object): - logger.debug(f"table object: {json.dumps(obj)}") return parse_obj_as(Table, obj) +class NoPermission(BaseModel): + has_permission: bool = False + + @validator("has_permission") + def has_permission_must_be_false(cls, value): + if value is False: + return value + + raise ValueError("has_permission must be False") + + class LineageColumnInfo(BaseModel): name: str catalog_name: str @@ -83,8 +93,13 @@ class LineageColumnInfo(BaseModel): class ColumnLineage(BaseModel): - upstream_cols: List[LineageColumnInfo] = [] - downstream_cols: List[LineageColumnInfo] = [] + upstream_cols: List[Union[LineageColumnInfo, NoPermission]] = [] + downstream_cols: List[Union[LineageColumnInfo, NoPermission]] = [] + + +class FileInfo(BaseModel): + path: str + has_permission: bool class TableInfo(BaseModel): @@ -94,7 +109,8 @@ class TableInfo(BaseModel): class LineageInfo(BaseModel): - tableInfo: TableInfo + tableInfo: Optional[Union[TableInfo, NoPermission]] = None + fileInfo: Optional[FileInfo] = None class TableLineage(BaseModel): diff --git a/metaphor/unity_catalog/utils.py b/metaphor/unity_catalog/utils.py index 8b433597..d37c6fc3 100644 --- a/metaphor/unity_catalog/utils.py +++ b/metaphor/unity_catalog/utils.py @@ -2,22 +2,20 @@ from pydantic import parse_obj_as from requests import HTTPError -from metaphor.common.logger import get_logger +from metaphor.common.logger import json_dump_to_debug_file from metaphor.unity_catalog.models import ColumnLineage, TableLineage -logger = get_logger() - def list_table_lineage(client: ApiClient, table_name: str) -> TableLineage: _data = {"table_name": table_name} + resp = None try: - return parse_obj_as( - TableLineage, - client.perform_query( - "GET", "/lineage-tracking/table-lineage", data=_data, version="2.0" - ), + resp = client.perform_query( + "GET", "/lineage-tracking/table-lineage", data=_data, version="2.0" ) + json_dump_to_debug_file(resp, f"table-lineage-{table_name}.json") + return parse_obj_as(TableLineage, resp) except HTTPError as e: # Lineage API returns 503 on GCP as it's not yet available if e.response.status_code == 503: @@ -33,12 +31,11 @@ def list_column_lineage( # Lineage API returns 503 on GCP as it's not yet available try: - return parse_obj_as( - ColumnLineage, - client.perform_query( - "GET", "/lineage-tracking/column-lineage", data=_data, version="2.0" - ), + resp = client.perform_query( + "GET", "/lineage-tracking/column-lineage", data=_data, version="2.0" ) + json_dump_to_debug_file(resp, f"column-lineage-{table_name}-{column_name}.json") + return parse_obj_as(ColumnLineage, resp) except HTTPError as e: # Lineage API returns 503 on GCP as it's not yet available if e.response.status_code == 503: diff --git a/poetry.lock b/poetry.lock index c7be12ee..5a212e65 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "anyio" @@ -679,13 +679,13 @@ tox = ["tox"] [[package]] name = "databricks-cli" -version = "0.17.7" +version = "0.18.0" description = "A command line interface for Databricks" optional = true -python-versions = "*" +python-versions = ">=3.7" files = [ - {file = "databricks-cli-0.17.7.tar.gz", hash = "sha256:5a545063449f3b9ad904644c0f251058485e29e564dedf8d4e4a7b45caf9549b"}, - {file = "databricks_cli-0.17.7-py2-none-any.whl", hash = "sha256:5b025943c70bbd374415264d38bfaddfb34ce070fadb083d851aec311e0f8901"}, + {file = "databricks-cli-0.18.0.tar.gz", hash = "sha256:87569709eda9af3e9db8047b691e420b5e980c62ef01675575c0d2b9b4211eb7"}, + {file = "databricks_cli-0.18.0-py2.py3-none-any.whl", hash = "sha256:1176a5f42d3e8af4abfc915446fb23abc44513e325c436725f5898cbb9e3384b"}, ] [package.dependencies] @@ -695,7 +695,7 @@ pyjwt = ">=1.7.0" requests = ">=2.17.3" six = ">=1.10.0" tabulate = ">=0.7.7" -urllib3 = ">=1.26.7,<2.0.0" +urllib3 = ">=1.26.7,<3" [[package]] name = "dataclasses-json" @@ -910,12 +910,12 @@ files = [ google-auth = ">=2.14.1,<3.0dev" googleapis-common-protos = ">=1.56.2,<2.0dev" grpcio = [ + {version = ">=1.33.2,<2.0dev", optional = true, markers = "extra == \"grpc\""}, {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, - {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, ] grpcio-status = [ + {version = ">=1.33.2,<2.0dev", optional = true, markers = "extra == \"grpc\""}, {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}, - {version = ">=1.33.2,<2.0dev", optional = true, markers = "python_version < \"3.11\" and extra == \"grpc\""}, ] protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0dev" requests = ">=2.18.0,<3.0.0dev" @@ -2930,7 +2930,8 @@ files = [ {file = "ruamel.yaml.clib-0.2.7-cp310-cp310-win32.whl", hash = "sha256:763d65baa3b952479c4e972669f679fe490eee058d5aa85da483ebae2009d231"}, {file = "ruamel.yaml.clib-0.2.7-cp310-cp310-win_amd64.whl", hash = "sha256:d000f258cf42fec2b1bbf2863c61d7b8918d31ffee905da62dede869254d3b8a"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:045e0626baf1c52e5527bd5db361bc83180faaba2ff586e763d3d5982a876a9e"}, - {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_12_6_arm64.whl", hash = "sha256:721bc4ba4525f53f6a611ec0967bdcee61b31df5a56801281027a3a6d1c2daf5"}, + {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:1a6391a7cabb7641c32517539ca42cf84b87b667bad38b78d4d42dd23e957c81"}, + {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux2014_aarch64.whl", hash = "sha256:9c7617df90c1365638916b98cdd9be833d31d337dbcd722485597b43c4a215bf"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.manylinux_2_24_x86_64.whl", hash = "sha256:41d0f1fa4c6830176eef5b276af04c89320ea616655d01327d5ce65e50575c94"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win32.whl", hash = "sha256:f6d3d39611ac2e4f62c3128a9eed45f19a6608670c5a2f4f07f24e8de3441d38"}, {file = "ruamel.yaml.clib-0.2.7-cp311-cp311-win_amd64.whl", hash = "sha256:da538167284de58a52109a9b89b8f6a53ff8437dd6dc26d33b57bf6699153122"}, @@ -3199,7 +3200,7 @@ files = [ ] [package.dependencies] -greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"aarch64\" or platform_machine == \"ppc64le\" or platform_machine == \"x86_64\" or platform_machine == \"amd64\" or platform_machine == \"AMD64\" or platform_machine == \"win32\" or platform_machine == \"WIN32\")"} +greenlet = {version = "!=0.4.17", markers = "python_version >= \"3\" and (platform_machine == \"win32\" or platform_machine == \"WIN32\" or platform_machine == \"AMD64\" or platform_machine == \"amd64\" or platform_machine == \"x86_64\" or platform_machine == \"ppc64le\" or platform_machine == \"aarch64\")"} [package.extras] aiomysql = ["aiomysql", "greenlet (!=0.4.17)"] @@ -3512,4 +3513,4 @@ unity-catalog = ["databricks-cli"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "0925c03ccd21d315140881359432d52e0afd12a74dbb6f21dda34aa9a6ce7afa" +content-hash = "564c0ff715391054414bd3a5ddca11f1a49a6fd52ede7027b20f6bd6250738b0" diff --git a/pyproject.toml b/pyproject.toml index 64620036..2867422d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.9" +version = "0.13.10" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] @@ -22,7 +22,7 @@ azure-mgmt-datafactory = { version = "^3.1.0", optional = true } boto3 = "^1.28.57" botocore = "^1.31.57" canonicaljson = "^2.0.0" -databricks-cli = { version = "^0.17.3", optional = true } +databricks-cli = { version = "^0.18.0", optional = true } GitPython = "^3.1.37" google-cloud-bigquery = { version = "^3.1.0", optional = true } google-cloud-logging = { version = "^3.5.0", optional = true } diff --git a/tests/unity_catalog/test_extractor.py b/tests/unity_catalog/test_extractor.py index 0e9d686c..2e37fae5 100644 --- a/tests/unity_catalog/test_extractor.py +++ b/tests/unity_catalog/test_extractor.py @@ -8,8 +8,10 @@ from metaphor.unity_catalog.extractor import UnityCatalogExtractor from metaphor.unity_catalog.models import ( ColumnLineage, + FileInfo, LineageColumnInfo, LineageInfo, + NoPermission, TableInfo, TableLineage, ) @@ -117,9 +119,15 @@ def mock_list_tables(catalog_name, schema_name, name_pattern): LineageInfo( tableInfo=TableInfo( name="upstream", catalog_name="db", schema_name="schema" - ) - ) - ] + ), + fileInfo=None, + ), + LineageInfo( + tableInfo=None, + fileInfo=FileInfo(path="s3://path", has_permission=True), + ), + LineageInfo(tableInfo=NoPermission(), fileInfo=None), + ], ), TableLineage(), ]