Skip to content

Commit

Permalink
Move UC-specific metadata from CustomMetadata to dedicated fields
Browse files Browse the repository at this point in the history
  • Loading branch information
mars-lan committed Oct 21, 2023
1 parent 4d9c518 commit 5090167
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 60 deletions.
5 changes: 5 additions & 0 deletions metaphor/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def safe_parse_ISO8601(iso8601_str: Optional[str]) -> Optional[datetime]:
return None


def safe_str(value: Any) -> Optional[str]:
"""Converts a value to str, return None if the original value is None"""
return None if value is None else str(value)


def safe_float(value: Optional[Union[float, int, str]]) -> Optional[float]:
"""Converts a value to float, return None if the original value is None or NaN or INF"""
return (
Expand Down
30 changes: 25 additions & 5 deletions metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import json
import logging
import urllib.parse
from typing import Collection, Dict, Generator, List

from databricks_cli.sdk.api_client import ApiClient
Expand All @@ -12,22 +14,25 @@
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.filter import DatasetFilter
from metaphor.common.logger import get_logger, json_dump_to_debug_file
from metaphor.common.utils import unique_list
from metaphor.common.utils import safe_str, unique_list
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
CustomMetadata,
DataPlatform,
Dataset,
DatasetLogicalID,
DatasetSchema,
DatasetStructure,
DatasetUpstream,
FieldMapping,
KeyValuePair,
MaterializationType,
SchemaField,
SchemaType,
SourceField,
SourceInfo,
SQLSchema,
UnityCatalog,
UnityCatalogTableType,
)
from metaphor.unity_catalog.config import UnityCatalogRunConfig
from metaphor.unity_catalog.models import (
Expand All @@ -48,10 +53,11 @@
}
)

TABLE_TYPE_MAP = {
TABLE_TYPE_MATERIALIZATION_TYPE_MAP = {
TableType.MANAGED: MaterializationType.TABLE,
TableType.EXTERNAL: MaterializationType.EXTERNAL,
TableType.VIEW: MaterializationType.VIEW,
TableType.MATERIALIZED_VIEW: MaterializationType.MATERIALIZED_VIEW,
}


Expand Down Expand Up @@ -159,14 +165,28 @@ def _init_dataset(self, table: Table) -> Dataset:
for column in table.columns
],
sql_schema=SQLSchema(
materialization=TABLE_TYPE_MAP.get(
materialization=TABLE_TYPE_MATERIALIZATION_TYPE_MAP.get(
table.table_type, MaterializationType.TABLE
),
table_schema=table.view_definition if table.view_definition else None,
),
)

dataset.custom_metadata = CustomMetadata(metadata=table.extra_metadata())
path = urllib.parse.quote(
f"/explore/data/{database}/{schema_name}/{table_name}"
)
dataset.source_info = SourceInfo(main_url=f"{self._host}{path}")

dataset.unity_catalog = UnityCatalog(
table_type=UnityCatalogTableType[table.table_type],
data_source_format=safe_str(table.data_source_format),
storage_location=table.storage_location,
owner=table.owner,
properties=[
KeyValuePair(key=k, value=json.dumps(v))
for k, v in table.properties.items()
],
)

self._datasets[normalized_name] = dataset

Expand Down
24 changes: 6 additions & 18 deletions metaphor/unity_catalog/models.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import json
from enum import Enum
from typing import List, Optional, Union

from pydantic import BaseModel, parse_obj_as, validator

from metaphor.common.logger import get_logger
from metaphor.models.metadata_change_event import CustomMetadataItem

logger = get_logger()

Expand All @@ -21,6 +19,8 @@ class TableType(str, Enum):
MANAGED = "MANAGED"
EXTERNAL = "EXTERNAL"
VIEW = "VIEW"
MATERIALIZED_VIEW = "MATERIALIZED_VIEW"
STREAMING_TABLE = "STREAMING_TABLE"


class DataSourceFormat(str, Enum):
Expand All @@ -36,6 +36,9 @@ class DataSourceFormat(str, Enum):
)
DELTASHARING = "DELTASHARING" # a Table shared through the Delta Sharing protocol

def __str__(self):
return str(self.value)


class Table(BaseModel):
catalog_name: str
Expand All @@ -45,7 +48,7 @@ class Table(BaseModel):
generation: Optional[int]
name: str
owner: str
properties: object
properties: dict
schema_name: str
storage_location: Optional[str]
sql_path: Optional[str]
Expand All @@ -54,21 +57,6 @@ class Table(BaseModel):
updated_by: str
view_definition: Optional[str]

def extra_metadata(self) -> List[CustomMetadataItem]:
properties = [
"data_source_format",
"generation",
"owner",
"properties",
"storage_location",
"sql_path",
"table_type",
]
return [
CustomMetadataItem(key=p, value=json.dumps(getattr(self, p)))
for p in filter(lambda p: getattr(self, p, None), properties)
]


def parse_table_from_object(obj: object):
return parse_obj_as(Table, obj)
Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.13.17"
version = "0.13.18"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down Expand Up @@ -29,7 +29,7 @@ google-cloud-logging = { version = "^3.5.0", optional = true }
jsonschema = "^4.18.6"
lkml = { version = "^1.3.1", optional = true }
looker-sdk = { version = "^23.6.0", optional = true }
metaphor-models = "0.27.7"
metaphor-models = "0.27.9"
msal = { version = "^1.20.0", optional = true }
oscrypto = { git = "https://github.com/wbond/oscrypto.git", rev = "1547f53" } # Until oscrypto 1.3.1 is release: https://github.com/wbond/oscrypto/issues/78
pycarlo = { version = "^0.8.1", optional = true }
Expand Down
7 changes: 7 additions & 0 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
safe_float,
safe_int,
safe_parse_ISO8601,
safe_str,
start_of_day,
unique_list,
)
Expand Down Expand Up @@ -120,6 +121,12 @@ def test_safe_parse_ISO8061():
assert safe_parse_ISO8601("isvalid") is None


def test_safe_str():
assert safe_str(None) is None
assert safe_str("string") == "string"
assert safe_str(100) == "100"


def test_safe_float():
assert safe_float(None) is None
assert safe_float(float("NaN")) is None
Expand Down
100 changes: 70 additions & 30 deletions tests/unity_catalog/expected.json
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
[
{
"customMetadata": {
"metadata": [
"unityCatalog": {
"tableType": "MANAGED",
"dataSourceFormat": "CSV",
"owner": "[email protected]",
"storageLocation": "s3://path",
"properties": [
{
"key": "data_source_format",
"value": "\"CSV\""
},
{
"key": "owner",
"value": "\"[email protected]\""
},
{
"key": "properties",
"value": "{\"delta.lastCommitTimestamp\": \"1664444422000\"}"
},
{
"key": "storage_location",
"value": "\"s3://path\""
},
{
"key": "table_type",
"value": "\"MANAGED\""
"key": "delta.lastCommitTimestamp",
"value": "\"1664444422000\""
}
]
},
Expand Down Expand Up @@ -68,26 +56,75 @@
"DATASET~4B3CF34E5B62D97FAF33F75C7B32BB84",
"DATASET~97D032124F4B526411F0D04797CEAC96"
]
},
"sourceInfo": {
"mainUrl": "http://dummy.host/explore/data/catalog/schema/table"
}
},
{
"customMetadata": {
"metadata": [
"unityCatalog": {
"tableType": "VIEW",
"owner": "[email protected]",
"properties": [
{
"key": "view.catalogAndNamespace.numParts",
"value": "\"2\""
},
{
"key": "view.sqlConfig.spark.sql.hive.convertCTAS",
"value": "\"true\""
},
{
"key": "view.query.out.col.0",
"value": "\"key\""
},
{
"key": "view.sqlConfig.spark.sql.parquet.compression.codec",
"value": "\"snappy\""
},
{
"key": "view.query.out.numCols",
"value": "\"3\""
},
{
"key": "view.referredTempViewNames",
"value": "\"[]\""
},
{
"key": "view.query.out.col.1",
"value": "\"values\""
},
{
"key": "view.sqlConfig.spark.sql.streaming.stopTimeout",
"value": "\"15s\""
},
{
"key": "view.catalogAndNamespace.part.0",
"value": "\"catalog\""
},
{
"key": "view.sqlConfig.spark.sql.sources.commitProtocolClass",
"value": "\"com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol\""
},
{
"key": "view.sqlConfig.spark.sql.sources.default",
"value": "\"delta\""
},
{
"key": "generation",
"value": "1"
"key": "view.sqlConfig.spark.sql.legacy.createHiveTableByDefault",
"value": "\"false\""
},
{
"key": "owner",
"value": "\"[email protected]\""
"key": "view.query.out.col.2",
"value": "\"nested_values\""
},
{
"key": "properties",
"value": "{\"view.catalogAndNamespace.numParts\": \"2\", \"view.sqlConfig.spark.sql.hive.convertCTAS\": \"true\", \"view.query.out.col.0\": \"key\", \"view.sqlConfig.spark.sql.parquet.compression.codec\": \"snappy\", \"view.query.out.numCols\": \"3\", \"view.referredTempViewNames\": \"[]\", \"view.query.out.col.1\": \"values\", \"view.sqlConfig.spark.sql.streaming.stopTimeout\": \"15s\", \"view.catalogAndNamespace.part.0\": \"catalog\", \"view.sqlConfig.spark.sql.sources.commitProtocolClass\": \"com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol\", \"view.sqlConfig.spark.sql.sources.default\": \"delta\", \"view.sqlConfig.spark.sql.legacy.createHiveTableByDefault\": \"false\", \"view.query.out.col.2\": \"nested_values\", \"view.referredTempFunctionsNames\": \"[]\", \"view.catalogAndNamespace.part.1\": \"default\"}"
"key": "view.referredTempFunctionsNames",
"value": "\"[]\""
},
{
"key": "table_type",
"value": "\"VIEW\""
"key": "view.catalogAndNamespace.part.1",
"value": "\"default\""
}
]
},
Expand Down Expand Up @@ -115,6 +152,9 @@
"database": "catalog",
"schema": "schema",
"table": "view"
},
"sourceInfo": {
"mainUrl": "http://dummy.host/explore/data/catalog/schema/view"
}
}
]
2 changes: 1 addition & 1 deletion tests/unity_catalog/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

def dummy_config():
return UnityCatalogRunConfig(
host="",
host="http://dummy.host",
token="",
output=OutputConfig(),
)
Expand Down

0 comments on commit 5090167

Please sign in to comment.