From 5090167b70484e692ba804db9a4402f55dbc8869 Mon Sep 17 00:00:00 2001 From: Mars Lan Date: Sat, 21 Oct 2023 03:17:52 -0700 Subject: [PATCH] Move UC-specific metadata from CustomMetadata to dedicated fields --- metaphor/common/utils.py | 5 ++ metaphor/unity_catalog/extractor.py | 30 ++++++-- metaphor/unity_catalog/models.py | 24 ++----- poetry.lock | 8 +-- pyproject.toml | 4 +- tests/common/test_utils.py | 7 ++ tests/unity_catalog/expected.json | 100 ++++++++++++++++++-------- tests/unity_catalog/test_extractor.py | 2 +- 8 files changed, 120 insertions(+), 60 deletions(-) diff --git a/metaphor/common/utils.py b/metaphor/common/utils.py index 99251d2a..b2346e7d 100644 --- a/metaphor/common/utils.py +++ b/metaphor/common/utils.py @@ -65,6 +65,11 @@ def safe_parse_ISO8601(iso8601_str: Optional[str]) -> Optional[datetime]: return None +def safe_str(value: Any) -> Optional[str]: + """Converts a value to str, return None if the original value is None""" + return None if value is None else str(value) + + def safe_float(value: Optional[Union[float, int, str]]) -> Optional[float]: """Converts a value to float, return None if the original value is None or NaN or INF""" return ( diff --git a/metaphor/unity_catalog/extractor.py b/metaphor/unity_catalog/extractor.py index da343778..45feddf0 100644 --- a/metaphor/unity_catalog/extractor.py +++ b/metaphor/unity_catalog/extractor.py @@ -1,4 +1,6 @@ +import json import logging +import urllib.parse from typing import Collection, Dict, Generator, List from databricks_cli.sdk.api_client import ApiClient @@ -12,10 +14,9 @@ from metaphor.common.event_util import ENTITY_TYPES from metaphor.common.filter import DatasetFilter from metaphor.common.logger import get_logger, json_dump_to_debug_file -from metaphor.common.utils import unique_list +from metaphor.common.utils import safe_str, unique_list from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( - CustomMetadata, DataPlatform, Dataset, DatasetLogicalID, @@ -23,11 +24,15 @@ DatasetStructure, DatasetUpstream, FieldMapping, + KeyValuePair, MaterializationType, SchemaField, SchemaType, SourceField, + SourceInfo, SQLSchema, + UnityCatalog, + UnityCatalogTableType, ) from metaphor.unity_catalog.config import UnityCatalogRunConfig from metaphor.unity_catalog.models import ( @@ -48,10 +53,11 @@ } ) -TABLE_TYPE_MAP = { +TABLE_TYPE_MATERIALIZATION_TYPE_MAP = { TableType.MANAGED: MaterializationType.TABLE, TableType.EXTERNAL: MaterializationType.EXTERNAL, TableType.VIEW: MaterializationType.VIEW, + TableType.MATERIALIZED_VIEW: MaterializationType.MATERIALIZED_VIEW, } @@ -159,14 +165,28 @@ def _init_dataset(self, table: Table) -> Dataset: for column in table.columns ], sql_schema=SQLSchema( - materialization=TABLE_TYPE_MAP.get( + materialization=TABLE_TYPE_MATERIALIZATION_TYPE_MAP.get( table.table_type, MaterializationType.TABLE ), table_schema=table.view_definition if table.view_definition else None, ), ) - dataset.custom_metadata = CustomMetadata(metadata=table.extra_metadata()) + path = urllib.parse.quote( + f"/explore/data/{database}/{schema_name}/{table_name}" + ) + dataset.source_info = SourceInfo(main_url=f"{self._host}{path}") + + dataset.unity_catalog = UnityCatalog( + table_type=UnityCatalogTableType[table.table_type], + data_source_format=safe_str(table.data_source_format), + storage_location=table.storage_location, + owner=table.owner, + properties=[ + KeyValuePair(key=k, value=json.dumps(v)) + for k, v in table.properties.items() + ], + ) self._datasets[normalized_name] = dataset diff --git a/metaphor/unity_catalog/models.py b/metaphor/unity_catalog/models.py index cf865035..e97c5726 100644 --- a/metaphor/unity_catalog/models.py +++ b/metaphor/unity_catalog/models.py @@ -1,11 +1,9 @@ -import json from enum import Enum from typing import List, Optional, Union from pydantic import BaseModel, parse_obj_as, validator from metaphor.common.logger import get_logger -from metaphor.models.metadata_change_event import CustomMetadataItem logger = get_logger() @@ -21,6 +19,8 @@ class TableType(str, Enum): MANAGED = "MANAGED" EXTERNAL = "EXTERNAL" VIEW = "VIEW" + MATERIALIZED_VIEW = "MATERIALIZED_VIEW" + STREAMING_TABLE = "STREAMING_TABLE" class DataSourceFormat(str, Enum): @@ -36,6 +36,9 @@ class DataSourceFormat(str, Enum): ) DELTASHARING = "DELTASHARING" # a Table shared through the Delta Sharing protocol + def __str__(self): + return str(self.value) + class Table(BaseModel): catalog_name: str @@ -45,7 +48,7 @@ class Table(BaseModel): generation: Optional[int] name: str owner: str - properties: object + properties: dict schema_name: str storage_location: Optional[str] sql_path: Optional[str] @@ -54,21 +57,6 @@ class Table(BaseModel): updated_by: str view_definition: Optional[str] - def extra_metadata(self) -> List[CustomMetadataItem]: - properties = [ - "data_source_format", - "generation", - "owner", - "properties", - "storage_location", - "sql_path", - "table_type", - ] - return [ - CustomMetadataItem(key=p, value=json.dumps(getattr(self, p))) - for p in filter(lambda p: getattr(self, p, None), properties) - ] - def parse_table_from_object(obj: object): return parse_obj_as(Table, obj) diff --git a/poetry.lock b/poetry.lock index 712caa47..19cff728 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1787,13 +1787,13 @@ files = [ [[package]] name = "metaphor-models" -version = "0.27.7" +version = "0.27.9" description = "" optional = false python-versions = ">=3.8,<4.0" files = [ - {file = "metaphor_models-0.27.7-py3-none-any.whl", hash = "sha256:827b005ee945fb3429e8d8a43bf023749c69ed428f2c212285e34fc9a417ad9a"}, - {file = "metaphor_models-0.27.7.tar.gz", hash = "sha256:096e2375032f26dfcdff456166650300aaf6dd4ae33a9ef93b40ddbb1bb5eca6"}, + {file = "metaphor_models-0.27.9-py3-none-any.whl", hash = "sha256:ac1079765ad1827c9765c653178a40850e8162b94363b89828bdee41ba98e61d"}, + {file = "metaphor_models-0.27.9.tar.gz", hash = "sha256:e24b9311728b99d5536975bbc32889f29ae29d09791eb2e4491b69e69f7a7e6c"}, ] [[package]] @@ -3513,4 +3513,4 @@ unity-catalog = ["databricks-cli"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "9fed5e83c8f872e0a2ca840a59693bda48bd66972e6234f6217202dc7159c25a" +content-hash = "bc9a23189eac9829db2335e97c552e983d5459e6b42e955ae0b9feb9dcb03ea1" diff --git a/pyproject.toml b/pyproject.toml index de94375e..feb55f57 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.17" +version = "0.13.18" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] @@ -29,7 +29,7 @@ google-cloud-logging = { version = "^3.5.0", optional = true } jsonschema = "^4.18.6" lkml = { version = "^1.3.1", optional = true } looker-sdk = { version = "^23.6.0", optional = true } -metaphor-models = "0.27.7" +metaphor-models = "0.27.9" msal = { version = "^1.20.0", optional = true } oscrypto = { git = "https://github.com/wbond/oscrypto.git", rev = "1547f53" } # Until oscrypto 1.3.1 is release: https://github.com/wbond/oscrypto/issues/78 pycarlo = { version = "^0.8.1", optional = true } diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index 995ae0cc..434484fc 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -14,6 +14,7 @@ safe_float, safe_int, safe_parse_ISO8601, + safe_str, start_of_day, unique_list, ) @@ -120,6 +121,12 @@ def test_safe_parse_ISO8061(): assert safe_parse_ISO8601("isvalid") is None +def test_safe_str(): + assert safe_str(None) is None + assert safe_str("string") == "string" + assert safe_str(100) == "100" + + def test_safe_float(): assert safe_float(None) is None assert safe_float(float("NaN")) is None diff --git a/tests/unity_catalog/expected.json b/tests/unity_catalog/expected.json index cd1f4ded..b597b269 100644 --- a/tests/unity_catalog/expected.json +++ b/tests/unity_catalog/expected.json @@ -1,26 +1,14 @@ [ { - "customMetadata": { - "metadata": [ + "unityCatalog": { + "tableType": "MANAGED", + "dataSourceFormat": "CSV", + "owner": "foo@bar.com", + "storageLocation": "s3://path", + "properties": [ { - "key": "data_source_format", - "value": "\"CSV\"" - }, - { - "key": "owner", - "value": "\"foo@bar.com\"" - }, - { - "key": "properties", - "value": "{\"delta.lastCommitTimestamp\": \"1664444422000\"}" - }, - { - "key": "storage_location", - "value": "\"s3://path\"" - }, - { - "key": "table_type", - "value": "\"MANAGED\"" + "key": "delta.lastCommitTimestamp", + "value": "\"1664444422000\"" } ] }, @@ -68,26 +56,75 @@ "DATASET~4B3CF34E5B62D97FAF33F75C7B32BB84", "DATASET~97D032124F4B526411F0D04797CEAC96" ] + }, + "sourceInfo": { + "mainUrl": "http://dummy.host/explore/data/catalog/schema/table" } }, { - "customMetadata": { - "metadata": [ + "unityCatalog": { + "tableType": "VIEW", + "owner": "foo@bar.com", + "properties": [ + { + "key": "view.catalogAndNamespace.numParts", + "value": "\"2\"" + }, + { + "key": "view.sqlConfig.spark.sql.hive.convertCTAS", + "value": "\"true\"" + }, + { + "key": "view.query.out.col.0", + "value": "\"key\"" + }, + { + "key": "view.sqlConfig.spark.sql.parquet.compression.codec", + "value": "\"snappy\"" + }, + { + "key": "view.query.out.numCols", + "value": "\"3\"" + }, + { + "key": "view.referredTempViewNames", + "value": "\"[]\"" + }, + { + "key": "view.query.out.col.1", + "value": "\"values\"" + }, + { + "key": "view.sqlConfig.spark.sql.streaming.stopTimeout", + "value": "\"15s\"" + }, + { + "key": "view.catalogAndNamespace.part.0", + "value": "\"catalog\"" + }, + { + "key": "view.sqlConfig.spark.sql.sources.commitProtocolClass", + "value": "\"com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol\"" + }, + { + "key": "view.sqlConfig.spark.sql.sources.default", + "value": "\"delta\"" + }, { - "key": "generation", - "value": "1" + "key": "view.sqlConfig.spark.sql.legacy.createHiveTableByDefault", + "value": "\"false\"" }, { - "key": "owner", - "value": "\"foo@bar.com\"" + "key": "view.query.out.col.2", + "value": "\"nested_values\"" }, { - "key": "properties", - "value": "{\"view.catalogAndNamespace.numParts\": \"2\", \"view.sqlConfig.spark.sql.hive.convertCTAS\": \"true\", \"view.query.out.col.0\": \"key\", \"view.sqlConfig.spark.sql.parquet.compression.codec\": \"snappy\", \"view.query.out.numCols\": \"3\", \"view.referredTempViewNames\": \"[]\", \"view.query.out.col.1\": \"values\", \"view.sqlConfig.spark.sql.streaming.stopTimeout\": \"15s\", \"view.catalogAndNamespace.part.0\": \"catalog\", \"view.sqlConfig.spark.sql.sources.commitProtocolClass\": \"com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol\", \"view.sqlConfig.spark.sql.sources.default\": \"delta\", \"view.sqlConfig.spark.sql.legacy.createHiveTableByDefault\": \"false\", \"view.query.out.col.2\": \"nested_values\", \"view.referredTempFunctionsNames\": \"[]\", \"view.catalogAndNamespace.part.1\": \"default\"}" + "key": "view.referredTempFunctionsNames", + "value": "\"[]\"" }, { - "key": "table_type", - "value": "\"VIEW\"" + "key": "view.catalogAndNamespace.part.1", + "value": "\"default\"" } ] }, @@ -115,6 +152,9 @@ "database": "catalog", "schema": "schema", "table": "view" + }, + "sourceInfo": { + "mainUrl": "http://dummy.host/explore/data/catalog/schema/view" } } ] diff --git a/tests/unity_catalog/test_extractor.py b/tests/unity_catalog/test_extractor.py index d41e09d8..0f143dd5 100644 --- a/tests/unity_catalog/test_extractor.py +++ b/tests/unity_catalog/test_extractor.py @@ -20,7 +20,7 @@ def dummy_config(): return UnityCatalogRunConfig( - host="", + host="http://dummy.host", token="", output=OutputConfig(), )