Skip to content

Commit

Permalink
Support Databricks shallow clone (#908)
Browse files Browse the repository at this point in the history
* Support shallow clone table

* Bump version

* update lock file

* Fix mypy error

* Bump version
  • Loading branch information
elic-eon authored Jul 18, 2024
1 parent a8086a0 commit b404da6
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 23 deletions.
26 changes: 21 additions & 5 deletions metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,25 @@
)

TABLE_TYPE_MATERIALIZATION_TYPE_MAP = {
TableType.MANAGED: MaterializationType.TABLE,
TableType.EXTERNAL: MaterializationType.EXTERNAL,
TableType.VIEW: MaterializationType.VIEW,
TableType.EXTERNAL_SHALLOW_CLONE: MaterializationType.EXTERNAL,
TableType.FOREIGN: MaterializationType.EXTERNAL,
TableType.MANAGED: MaterializationType.TABLE,
TableType.MANAGED_SHALLOW_CLONE: MaterializationType.TABLE,
TableType.MATERIALIZED_VIEW: MaterializationType.MATERIALIZED_VIEW,
TableType.STREAMING_TABLE: MaterializationType.STREAM,
TableType.VIEW: MaterializationType.VIEW,
}

TABLE_TYPE_MAP = {
TableType.EXTERNAL: UnityCatalogTableType.EXTERNAL,
TableType.EXTERNAL_SHALLOW_CLONE: UnityCatalogTableType.EXTERNAL_SHALLOW_CLONE,
TableType.FOREIGN: UnityCatalogTableType.FOREIGN,
TableType.MANAGED: UnityCatalogTableType.MANAGED,
TableType.MANAGED_SHALLOW_CLONE: UnityCatalogTableType.MANAGED_SHALLOW_CLONE,
TableType.MATERIALIZED_VIEW: UnityCatalogTableType.MATERIALIZED_VIEW,
TableType.STREAMING_TABLE: UnityCatalogTableType.STREAMING_TABLE,
TableType.VIEW: UnityCatalogTableType.VIEW,
}

# For variable substitution in source URLs
Expand Down Expand Up @@ -241,6 +256,7 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset:
table_name = table_info.name
schema_name = table_info.schema_name
database = table_info.catalog_name
table_type = table_info.table_type

normalized_name = dataset_normalized_name(database, schema_name, table_name)

Expand All @@ -253,7 +269,7 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset:
database=database, schema=schema_name, table=table_name
)

if table_info.table_type is None:
if table_type is None:
raise ValueError(f"Invalid table {table_info.name}, no table_type found")

fields = []
Expand All @@ -269,7 +285,7 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset:
fields=fields,
sql_schema=SQLSchema(
materialization=TABLE_TYPE_MATERIALIZATION_TYPE_MAP.get(
table_info.table_type, MaterializationType.TABLE
table_type, MaterializationType.TABLE
),
table_schema=(
table_info.view_definition if table_info.view_definition else None
Expand All @@ -289,7 +305,7 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset:
dataset.unity_catalog = UnityCatalog(
dataset_type=UnityCatalogDatasetType.UNITY_CATALOG_TABLE,
table_info=UnityCatalogTableInfo(
type=UnityCatalogTableType[table_info.table_type.value],
type=TABLE_TYPE_MAP.get(table_type, UnityCatalogTableType.UNKNOWN),
data_source_format=(
table_info.data_source_format.value
if table_info.data_source_format is not None
Expand Down
39 changes: 27 additions & 12 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.40"
version = "0.14.41"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down Expand Up @@ -41,7 +41,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true }
llama-index-readers-notion = { version = "^0.1.6", optional = true }
looker-sdk = { version = "^24.2.0", optional = true }
lxml = { version = "~=5.0.0", optional = true }
metaphor-models = "0.36.1"
metaphor-models = "0.37.0"
more-itertools = { version = "^10.1.0", optional = true }
msal = { version = "^1.28.0", optional = true }
msgraph-beta-sdk = { version = "~1.4.0", optional = true }
Expand Down Expand Up @@ -160,6 +160,7 @@ pytest = "^8.1.1"
pytest-asyncio = "^0.23.5"
pytest-cov = "^4.0.0"
pytest-testmon = "^2.1.0"
pytest-snapshot = "^0.9.0"
testcontainers = "^3.7.1"
testcontainers-minio = "^0.0.1rc1"
types-attrs = "^19.1.0"
Expand Down
14 changes: 11 additions & 3 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import json
from datetime import datetime
from typing import Any, Iterator
from typing import Any, Iterator, Union

from metaphor.common.event_util import EventUtil
from metaphor.models.metadata_change_event import QueryLog, QueryLogs
from metaphor.common.event_util import ENTITY_TYPES, EventUtil
from metaphor.models.metadata_change_event import (
MetadataChangeEvent,
QueryLog,
QueryLogs,
)


def load_json(path):
Expand Down Expand Up @@ -70,3 +74,7 @@ def ignore_datetime_values( # noqa C901

def wrap_query_log_stream_to_event(logs: Iterator[QueryLog]):
return [EventUtil.build_then_trim(QueryLogs(logs=list(logs)))]


def serialize_event(event: Union[MetadataChangeEvent, ENTITY_TYPES]) -> str:
return "".join([json.dumps(EventUtil.trim_event(event), indent=2), "\n"])
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"logicalId": {
"name": "catalog.schema.table",
"platform": "UNITY_CATALOG"
},
"schema": {
"fields": [],
"schemaType": "SQL",
"sqlSchema": {
"materialization": "EXTERNAL"
}
},
"sourceInfo": {
"mainUrl": "http://foo.bar/catalog/schema/table"
},
"structure": {
"database": "catalog",
"schema": "schema",
"table": "table"
},
"systemTags": {
"tags": []
},
"unityCatalog": {
"datasetType": "UNITY_CATALOG_TABLE",
"tableInfo": {
"properties": [],
"type": "EXTERNAL_SHALLOW_CLONE"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"logicalId": {
"name": "catalog.schema.table",
"platform": "UNITY_CATALOG"
},
"schema": {
"fields": [],
"schemaType": "SQL",
"sqlSchema": {
"materialization": "TABLE"
}
},
"sourceInfo": {
"mainUrl": "http://foo.bar/catalog/schema/table"
},
"structure": {
"database": "catalog",
"schema": "schema",
"table": "table"
},
"systemTags": {
"tags": []
},
"unityCatalog": {
"datasetType": "UNITY_CATALOG_TABLE",
"tableInfo": {
"properties": [],
"type": "MANAGED_SHALLOW_CLONE"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
{
"logicalId": {
"name": "catalog.schema.table",
"platform": "UNITY_CATALOG"
},
"schema": {
"description": "example",
"fields": [
{
"description": "some description",
"fieldName": "col1",
"fieldPath": "col1",
"nativeType": "int",
"precision": 32.0
}
],
"schemaType": "SQL",
"sqlSchema": {
"materialization": "TABLE"
}
},
"sourceInfo": {
"createdAtSource": "1970-01-01T00:00:00+00:00",
"lastUpdated": "1970-01-01T00:00:00+00:00",
"mainUrl": "http://foo.bar/catalog/schema/table"
},
"structure": {
"database": "catalog",
"schema": "schema",
"table": "table"
},
"systemTags": {
"tags": []
},
"unityCatalog": {
"datasetType": "UNITY_CATALOG_TABLE",
"tableInfo": {
"dataSourceFormat": "CSV",
"owner": "[email protected]",
"properties": [
{
"key": "delta.lastCommitTimestamp",
"value": "\"1664444422000\""
}
],
"storageLocation": "s3://path",
"type": "MANAGED"
}
}
}
Loading

0 comments on commit b404da6

Please sign in to comment.