Skip to content

Commit

Permalink
Fix Unity Catalog parsing erros due to permission issues (#634)
Browse files Browse the repository at this point in the history
  • Loading branch information
mars-lan authored Oct 17, 2023
1 parent a6d30bc commit 511866e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 38 deletions.
26 changes: 23 additions & 3 deletions metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
)
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.filter import DatasetFilter
from metaphor.common.logger import get_logger
from metaphor.common.logger import get_logger, json_dump_to_debug_file
from metaphor.common.utils import unique_list
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
Expand All @@ -30,7 +30,12 @@
SQLSchema,
)
from metaphor.unity_catalog.config import UnityCatalogRunConfig
from metaphor.unity_catalog.models import Table, TableType, parse_table_from_object
from metaphor.unity_catalog.models import (
NoPermission,
Table,
TableType,
parse_table_from_object,
)
from metaphor.unity_catalog.utils import list_column_lineage, list_table_lineage

logger = get_logger()
Expand Down Expand Up @@ -97,6 +102,8 @@ async def extract(self) -> Collection[ENTITY_TYPES]:

def _get_catalogs(self) -> List[str]:
response = self._api.list_catalogs()
json_dump_to_debug_file(response, "list-catalogs.json")

catalogs = []
for catalog in response.get("catalogs", []):
if "name" in catalog:
Expand All @@ -105,6 +112,8 @@ def _get_catalogs(self) -> List[str]:

def _get_schemas(self, catalog: str) -> List[str]:
response = self._api.list_schemas(catalog_name=catalog, name_pattern=None)
json_dump_to_debug_file(response, f"list-schemas-{catalog}.json")

schemas = []
for schema in response.get("schemas", []):
if "name" in schema:
Expand All @@ -115,6 +124,7 @@ def _get_tables(self, catalog: str, schema: str) -> Generator[Table, None, None]
response = self._api.list_tables(
catalog_name=catalog, schema_name=schema, name_pattern=None
)
json_dump_to_debug_file(response, f"list-tables-{catalog}-{schema}.json")
for table in response.get("tables", []):
yield parse_table_from_object(table)

Expand Down Expand Up @@ -168,11 +178,12 @@ def _populate_lineage(self, dataset: Dataset):

# Skip table without upstream
if not lineage.upstreams:
logging.info(f"Skip table: {table_name} for no upstream")
logging.info(f"Table {table_name} has no upstream")
return

source_datasets = []
field_mappings = []
has_permission_issues = False
for field in dataset.schema.fields:
column_name = field.field_name
column_lineage = list_column_lineage(
Expand All @@ -181,6 +192,10 @@ def _populate_lineage(self, dataset: Dataset):

field_mapping = FieldMapping(destination=column_name, sources=[])
for upstream_col in column_lineage.upstream_cols:
if isinstance(upstream_col, NoPermission):
has_permission_issues = True
continue

normalized_name = dataset_normalized_name(
upstream_col.catalog_name,
upstream_col.schema_name,
Expand All @@ -202,6 +217,11 @@ def _populate_lineage(self, dataset: Dataset):
)
field_mappings.append(field_mapping)

if has_permission_issues:
logger.error(
f"Unable to extract lineage for {table_name} due to permission issues"
)

dataset.upstream = DatasetUpstream(
source_datasets=unique_list(source_datasets), field_mappings=field_mappings
)
Expand Down
28 changes: 22 additions & 6 deletions metaphor/unity_catalog/models.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import json
from enum import Enum
from typing import List, Optional
from typing import List, Optional, Union

from pydantic import BaseModel, parse_obj_as
from pydantic import BaseModel, parse_obj_as, validator

from metaphor.common.logger import get_logger
from metaphor.models.metadata_change_event import CustomMetadataItem
Expand Down Expand Up @@ -71,10 +71,20 @@ def extra_metadata(self) -> List[CustomMetadataItem]:


def parse_table_from_object(obj: object):
logger.debug(f"table object: {json.dumps(obj)}")
return parse_obj_as(Table, obj)


class NoPermission(BaseModel):
has_permission: bool = False

@validator("has_permission")
def has_permission_must_be_false(cls, value):
if value is False:
return value

raise ValueError("has_permission must be False")


class LineageColumnInfo(BaseModel):
name: str
catalog_name: str
Expand All @@ -83,8 +93,13 @@ class LineageColumnInfo(BaseModel):


class ColumnLineage(BaseModel):
upstream_cols: List[LineageColumnInfo] = []
downstream_cols: List[LineageColumnInfo] = []
upstream_cols: List[Union[LineageColumnInfo, NoPermission]] = []
downstream_cols: List[Union[LineageColumnInfo, NoPermission]] = []


class FileInfo(BaseModel):
path: str
has_permission: bool


class TableInfo(BaseModel):
Expand All @@ -94,7 +109,8 @@ class TableInfo(BaseModel):


class LineageInfo(BaseModel):
tableInfo: TableInfo
tableInfo: Optional[Union[TableInfo, NoPermission]] = None
fileInfo: Optional[FileInfo] = None


class TableLineage(BaseModel):
Expand Down
23 changes: 10 additions & 13 deletions metaphor/unity_catalog/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,20 @@
from pydantic import parse_obj_as
from requests import HTTPError

from metaphor.common.logger import get_logger
from metaphor.common.logger import json_dump_to_debug_file
from metaphor.unity_catalog.models import ColumnLineage, TableLineage

logger = get_logger()


def list_table_lineage(client: ApiClient, table_name: str) -> TableLineage:
_data = {"table_name": table_name}
resp = None

try:
return parse_obj_as(
TableLineage,
client.perform_query(
"GET", "/lineage-tracking/table-lineage", data=_data, version="2.0"
),
resp = client.perform_query(
"GET", "/lineage-tracking/table-lineage", data=_data, version="2.0"
)
json_dump_to_debug_file(resp, f"table-lineage-{table_name}.json")
return parse_obj_as(TableLineage, resp)
except HTTPError as e:
# Lineage API returns 503 on GCP as it's not yet available
if e.response.status_code == 503:
Expand All @@ -33,12 +31,11 @@ def list_column_lineage(

# Lineage API returns 503 on GCP as it's not yet available
try:
return parse_obj_as(
ColumnLineage,
client.perform_query(
"GET", "/lineage-tracking/column-lineage", data=_data, version="2.0"
),
resp = client.perform_query(
"GET", "/lineage-tracking/column-lineage", data=_data, version="2.0"
)
json_dump_to_debug_file(resp, f"column-lineage-{table_name}-{column_name}.json")
return parse_obj_as(ColumnLineage, resp)
except HTTPError as e:
# Lineage API returns 503 on GCP as it's not yet available
if e.response.status_code == 503:
Expand Down
23 changes: 12 additions & 11 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.9"
version = "0.13.10"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand All @@ -22,7 +22,7 @@ azure-mgmt-datafactory = { version = "^3.1.0", optional = true }
boto3 = "^1.28.57"
botocore = "^1.31.57"
canonicaljson = "^2.0.0"
databricks-cli = { version = "^0.17.3", optional = true }
databricks-cli = { version = "^0.18.0", optional = true }
GitPython = "^3.1.37"
google-cloud-bigquery = { version = "^3.1.0", optional = true }
google-cloud-logging = { version = "^3.5.0", optional = true }
Expand Down
14 changes: 11 additions & 3 deletions tests/unity_catalog/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
from metaphor.unity_catalog.extractor import UnityCatalogExtractor
from metaphor.unity_catalog.models import (
ColumnLineage,
FileInfo,
LineageColumnInfo,
LineageInfo,
NoPermission,
TableInfo,
TableLineage,
)
Expand Down Expand Up @@ -117,9 +119,15 @@ def mock_list_tables(catalog_name, schema_name, name_pattern):
LineageInfo(
tableInfo=TableInfo(
name="upstream", catalog_name="db", schema_name="schema"
)
)
]
),
fileInfo=None,
),
LineageInfo(
tableInfo=None,
fileInfo=FileInfo(path="s3://path", has_permission=True),
),
LineageInfo(tableInfo=NoPermission(), fileInfo=None),
],
),
TableLineage(),
]
Expand Down

0 comments on commit 511866e

Please sign in to comment.