Skip to content

Commit

Permalink
Refactor config for Unity Catalog connector (#857)
Browse files Browse the repository at this point in the history
* Refactor config for Unity Catalog connector

* Use f-string for concatenation

* Fix tests

* Use full URL to create API clients
  • Loading branch information
mars-lan authored May 7, 2024
1 parent 5944033 commit c4e024d
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 94 deletions.
18 changes: 6 additions & 12 deletions metaphor/unity_catalog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ Create a YAML config file based on the following template.
### Required Configurations

```yaml
host: <workspace_url>
hostname: <cluster_or_warehouse_hostname>
http_path: <http_path>
token: <access_token>
```
See [this page](https://docs.databricks.com/en/integrations/compute-details.html) for details on how to find the values for `hosthost` and `http_path`.

### Optional Configurations

#### Output Destination
Expand All @@ -31,19 +34,10 @@ See [Output Config](../common/docs/output.md) for more information.

See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config.

#### Cluster Path

To run the queries using a compute cluster, add its cluster path in the configuration file:

```yaml
cluster_path: <cluster_path>
```

You can find the cluster path in your Databricks workspace by following the [Databricks documentation](https://docs.databricks.com/en/integrations/compute-details.html) instructions.

#### Source URL

By default, each table is associated with a Unity Catalog URL derived from the `host` config.
By default, each table is associated with a Unity Catalog URL derived from the `hostname` config.

You can override this by specifying your own URL built from the catalog, schema, and table names:

```yaml
Expand Down
12 changes: 5 additions & 7 deletions metaphor/unity_catalog/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ class UnityCatalogQueryLogConfig:

@dataclass(config=ConnectorConfig)
class UnityCatalogRunConfig(BaseConfig):
host: str
# cluster/warehouse hostname & HTTP path
hostname: str
http_path: str

# API token
token: str

# Override the URL for each dataset
Expand All @@ -31,12 +35,6 @@ class UnityCatalogRunConfig(BaseConfig):
# Include or exclude specific databases/schemas/tables
filter: DatasetFilter = field(default_factory=lambda: DatasetFilter())

# The id of warehouse which will run the sql
warehouse_id: Optional[str] = None

# cluster http path
cluster_path: Optional[str] = None

# configs for fetching query logs
query_log: UnityCatalogQueryLogConfig = field(
default_factory=lambda: UnityCatalogQueryLogConfig()
Expand Down
26 changes: 11 additions & 15 deletions metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,17 @@ def from_config_file(config_file: str) -> "UnityCatalogExtractor":

def __init__(self, config: UnityCatalogRunConfig):
super().__init__(config)
self._host = config.host
self._token = config.token
self._source_url = config.source_url
self._api = create_api(self._host, self._token)
self._source_url = (
f"https://{config.hostname}/explore/data/{{catalog}}/{{schema}}/{{table}}"
if config.source_url is None
else config.source_url
)

self._api = create_api(f"https://{config.hostname}", config.token)
self._connection = create_connection(
self._api,
self._token,
config.warehouse_id,
cluster_hostname=config.host,
cluster_path=config.cluster_path,
token=config.token,
server_hostname=config.hostname,
http_path=config.http_path,
)

self._datasets: Dict[str, Dataset] = {}
Expand Down Expand Up @@ -173,12 +174,7 @@ def _get_table_infos(
yield table

def _get_source_url(self, database: str, schema_name: str, table_name: str):
url = (
f"{self._host}/explore/data/{{catalog}}/{{schema}}/{{table}}"
if self._source_url is None
else self._source_url
)

url = self._source_url
url = URL_DATABASE_RE.sub(urllib.parse.quote(database), url)
url = URL_SCHEMA_RE.sub(urllib.parse.quote(schema_name), url)
url = URL_TABLE_RE.sub(urllib.parse.quote(table_name), url)
Expand Down
5 changes: 4 additions & 1 deletion metaphor/unity_catalog/profile/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ Create a YAML config file based on the following template.
### Required Configurations

```yaml
host: <workspace_url>
hostname: <cluster_or_warehouse_hostname>
http_path: <http_path>
token: <access_token>
```
See [this page](https://docs.databricks.com/en/integrations/compute-details.html) for details on how to find the values for `hosthost` and `http_path`.

### Optional Configurations

See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config.
Expand Down
6 changes: 2 additions & 4 deletions metaphor/unity_catalog/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,9 @@ def from_config_file(config_file: str) -> "UnityCatalogProfileExtractor":

def __init__(self, config: UnityCatalogRunConfig):
super().__init__(config)
self._token = config.token
self._host = config.host
self._api = create_api(config.host, config.token)
self._api = create_api(f"https://{config.hostname}", config.token)
self._connection = create_connection(
self._api, config.token, config.warehouse_id
config.token, config.hostname, config.http_path
)
self._filter = config.filter.normalize().merge(DEFAULT_FILTER)

Expand Down
29 changes: 2 additions & 27 deletions metaphor/unity_catalog/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,10 @@ def escape_special_characters(name: str) -> str:


def create_connection(
client: WorkspaceClient,
token: str,
warehouse_id: Optional[str] = None,
cluster_hostname: Optional[str] = None,
cluster_path: Optional[str] = None,
server_hostname: Optional[str] = None,
http_path: Optional[str] = None,
) -> Connection:
server_hostname = cluster_hostname
http_path = cluster_path

if cluster_hostname is None and cluster_path is None:
logger.warning("No cluster configuration is found, fallback to SQL warehouse")

endpoints = list(client.warehouses.list())
if not endpoints:
raise ValueError(
"No valid warehouse nor valid cluster configuration is provided"
)

endpoint_info = endpoints[0]

if warehouse_id:
try:
endpoint_info = client.warehouses.get(warehouse_id)
except Exception:
raise ValueError(f"Invalid warehouse id: {warehouse_id}")

server_hostname = endpoint_info.odbc_params.hostname
http_path = endpoint_info.odbc_params.path

return sql.connect(
server_hostname=server_hostname,
http_path=http_path,
Expand Down
3 changes: 2 additions & 1 deletion tests/unity_catalog/config.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
---
host: host
hostname: hostname
http_path: path
token: token
source_url: http://foo.bar/{catalog}/{schema}/{table}
output: {}
4 changes: 2 additions & 2 deletions tests/unity_catalog/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
"sourceInfo": {
"createdAtSource": "1970-01-01T00:00:00+00:00",
"lastUpdated": "1970-01-01T00:00:00+00:00",
"mainUrl": "http://dummy.host/explore/data/catalog/schema/table"
"mainUrl": "https://dummy.host/explore/data/catalog/schema/table"
},
"structure": {
"database": "catalog",
Expand Down Expand Up @@ -125,7 +125,7 @@
"sourceInfo": {
"createdAtSource": "1970-01-01T00:00:00+00:00",
"lastUpdated": "1970-01-01T00:00:00+00:00",
"mainUrl": "http://dummy.host/explore/data/catalog/schema/view"
"mainUrl": "https://dummy.host/explore/data/catalog/schema/view"
},
"structure": {
"database": "catalog",
Expand Down
24 changes: 2 additions & 22 deletions tests/unity_catalog/profile/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from unittest.mock import MagicMock, patch

import pytest
from databricks.sdk.core import DatabricksError
from databricks.sdk.service.catalog import (
CatalogInfo,
ColumnInfo,
Expand All @@ -17,16 +16,15 @@
from metaphor.common.event_util import EventUtil
from metaphor.unity_catalog.config import UnityCatalogRunConfig
from metaphor.unity_catalog.profile.extractor import UnityCatalogProfileExtractor
from metaphor.unity_catalog.utils import create_connection
from tests.test_utils import load_json


def dummy_config():
return UnityCatalogRunConfig(
host="http://dummy.host",
hostname="dummy.host",
http_path="path",
token="",
output=OutputConfig(),
warehouse_id=None,
)


Expand Down Expand Up @@ -103,21 +101,3 @@ async def test_extractor(
events = [EventUtil.trim_event(e) for e in await extractor.extract()]

assert events == load_json(f"{test_root_dir}/unity_catalog/profile/expected.json")


def test_bad_warehouse():
client = MagicMock()
client.warehouses = MagicMock()
client.warehouses.list = MagicMock()
client.warehouses.list.return_value = iter([])

with pytest.raises(ValueError):
create_connection(client, "token", None)

client.warehouses.list.return_value = iter(["530e470a55aeb40d"])
client.warehouses.get = MagicMock(
side_effect=DatabricksError("SQL warehouse 530e470a55aeb40e does not exist.")
)

with pytest.raises(ValueError):
create_connection(client, "token", "530e470a55aeb40e")
3 changes: 2 additions & 1 deletion tests/unity_catalog/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ def test_yaml_config_password(test_root_dir):
)

assert config == UnityCatalogRunConfig(
host="host",
hostname="hostname",
http_path="path",
token="token",
source_url="http://foo.bar/{catalog}/{schema}/{table}",
output=OutputConfig(),
Expand Down
5 changes: 3 additions & 2 deletions tests/unity_catalog/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@

def dummy_config():
return UnityCatalogRunConfig(
host="http://dummy.host",
hostname="dummy.host",
http_path="path",
token="",
output=OutputConfig(),
)
Expand Down Expand Up @@ -246,7 +247,7 @@ def test_source_url(
extractor = UnityCatalogExtractor(config)
assert (
extractor._get_source_url("db", "schema", "table")
== "http://dummy.host/explore/data/db/schema/table"
== "https://dummy.host/explore/data/db/schema/table"
)

# Manual override with escaped characters
Expand Down

0 comments on commit c4e024d

Please sign in to comment.