Skip to content

Commit

Permalink
Add config to disable queries that require SELECT permission (#1030)
Browse files Browse the repository at this point in the history
  • Loading branch information
mars-lan authored Nov 4, 2024
1 parent 5f381a1 commit 658b20f
Show file tree
Hide file tree
Showing 12 changed files with 58 additions and 38 deletions.
5 changes: 5 additions & 0 deletions metaphor/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ def safe_str(value: Any) -> Optional[str]:
return None if value is None else str(value)


def non_empty_str(value: Any) -> Optional[str]:
"""Converts a value to str, return None if the original value is None or empty string"""
return None if value is None or value == "" else str(value)


def safe_float(value: Optional[Union[float, int, str]]) -> Optional[float]:
"""Converts a value to float, return None if the original value is None or NaN or INF"""
return (
Expand Down
16 changes: 4 additions & 12 deletions metaphor/unity_catalog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ See [Output Config](../common/docs/output.md) for more information.

See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config.

#### SELECT Permissions

Certain metadata, such as table properties & last refreshed time, can only be extracted if the user has SELECT permissions on the table. By default, the connector will attempt to extract this metadata and print errors in the logs if it fails. To disable this behavior, set `has_select_permissions` to `false`.

#### Source URL

By default, each table is associated with a Unity Catalog URL derived from the `hostname` config.
Expand Down Expand Up @@ -67,18 +71,6 @@ query_log:

See [Process Query](../common/docs/process_query.md) for more information on the optional `process_query_config` config.

#### Warehouse ID

Note: we encourage using cluster, this connector will deprecate the SQL warehouse support.

To run the queries using a specific warehouse, simply add its ID in the configuration file:

```yaml
warehouse_id: <warehouse_id>
```

If no warehouse id nor cluster path is provided, the connector automatically uses the first discovered warehouse.

## Testing

Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `unity_catalog` extra.
Expand Down
3 changes: 3 additions & 0 deletions metaphor/unity_catalog/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ class UnityCatalogRunConfig(BaseConfig):
# Include or exclude specific databases/schemas/tables
filter: DatasetFilter = field(default_factory=lambda: DatasetFilter())

# Whether the user has SELECT permissions on the tables
has_select_permissions: bool = True

# configs for fetching query logs
query_log: UnityCatalogQueryLogConfig = field(
default_factory=lambda: UnityCatalogQueryLogConfig()
Expand Down
20 changes: 12 additions & 8 deletions metaphor/unity_catalog/extractor.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import re
import urllib.parse
from typing import Collection, Dict, Iterator, List, Optional
Expand All @@ -15,7 +16,7 @@
from metaphor.common.fieldpath import build_schema_field
from metaphor.common.logger import get_logger
from metaphor.common.models import to_dataset_statistics
from metaphor.common.utils import safe_float
from metaphor.common.utils import non_empty_str, safe_float
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
AssetPlatform,
Expand Down Expand Up @@ -152,6 +153,8 @@ def __init__(self, config: UnityCatalogRunConfig):
self._describe_history_limit = config.describe_history_limit
self._max_concurrency = config.max_concurrency

self._has_select_permissions = config.has_select_permissions

self._api = create_api(f"https://{config.hostname}", config.token)

self._connection = create_connection(
Expand Down Expand Up @@ -217,9 +220,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]:

self._propagate_tags()

# Batch query table properties and last refreshed time
self._populate_table_properties()
self._populate_last_refreshed_time()
# Batch query table properties and last refreshed time if granted SELECT permissions
if self._has_select_permissions:
self._populate_table_properties()
self._populate_last_refreshed_time()

entities: List[ENTITY_TYPES] = []
entities.extend(self._datasets.values())
Expand Down Expand Up @@ -269,7 +273,7 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset:

dataset.schema = DatasetSchema(
schema_type=SchemaType.SQL,
description=table_info.comment,
description=non_empty_str(table_info.comment),
fields=fields,
sql_schema=SQLSchema(
materialization=TABLE_TYPE_MATERIALIZATION_TYPE_MAP.get(
Expand Down Expand Up @@ -338,7 +342,7 @@ def _init_column(self, column_info: ColumnInfo) -> SchemaField:
field = build_schema_field(
column_name=column_info.column_name,
field_type=column_info.data_type,
description=column_info.comment,
description=non_empty_str(column_info.comment),
nullable=column_info.is_nullable,
precision=safe_float(column_info.data_precision),
)
Expand Down Expand Up @@ -570,7 +574,7 @@ def _init_volume(self, volume: VolumeInfo):
)

dataset.schema = DatasetSchema(
description=volume.comment,
description=non_empty_str(volume.comment),
)

if volume.owner:
Expand Down Expand Up @@ -701,7 +705,7 @@ def _populate_table_properties(self):
continue

dataset.unity_catalog.table_info.properties = [
KeyValuePair(key=k, value=v) for k, v in properties.items()
KeyValuePair(key=k, value=json.dumps(v)) for k, v in properties.items()
]

def _get_owner_display_name(self, user_id: str) -> str:
Expand Down
4 changes: 2 additions & 2 deletions metaphor/unity_catalog/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,7 @@ def get_last_refreshed_time(
try:
cursor.execute(f"DESCRIBE HISTORY {table_full_name} LIMIT {limit}")
except Exception as error:
logger.exception(f"Failed to get history for {table_full_name}: {error}")
logger.error(f"Failed to get history for {table_full_name}: {error}")
return None

for history in cursor.fetchall():
Expand All @@ -704,7 +704,7 @@ def get_table_properties(
try:
cursor.execute(f"SHOW TBLPROPERTIES {table_full_name}")
except Exception as error:
logger.exception(
logger.error(
f"Failed to show table properties for {table_full_name}: {error}"
)
return None
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.144"
version = "0.14.145"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
8 changes: 8 additions & 0 deletions tests/common/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
is_email,
must_set_at_least_one,
must_set_exactly_one,
non_empty_str,
removesuffix,
safe_float,
safe_int,
Expand Down Expand Up @@ -145,6 +146,13 @@ def test_safe_str():
assert safe_str(100) == "100"


def test_non_empty_str():
assert non_empty_str(None) is None
assert non_empty_str("") is None
assert non_empty_str("string") == "string"
assert non_empty_str(100) == "100"


def test_safe_float():
assert safe_float(None) is None
assert safe_float(float("NaN")) is None
Expand Down
2 changes: 2 additions & 0 deletions tests/great_expectations/basic_sql/gx/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

uncommitted/
2 changes: 2 additions & 0 deletions tests/great_expectations/snowflake/gx/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@

uncommitted/
2 changes: 2 additions & 0 deletions tests/unity_catalog/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ hostname: hostname
http_path: path
token: token
source_url: http://foo.bar/{catalog}/{schema}/{table}
has_select_permissions: false
describe_history_limit: 30
max_concurrency: 20
output: {}
30 changes: 15 additions & 15 deletions tests/unity_catalog/expected.json
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@
"properties": [
{
"key": "delta.lastCommitTimestamp",
"value": "1664444422000"
"value": "\"1664444422000\""
}
]
}
Expand Down Expand Up @@ -286,59 +286,59 @@
"properties": [
{
"key": "view.catalogAndNamespace.numParts",
"value": "2"
"value": "\"2\""
},
{
"key": "view.sqlConfig.spark.sql.hive.convertCTAS",
"value": "true"
"value": "\"true\""
},
{
"key": "view.query.out.col.0",
"value": "key"
"value": "\"key\""
},
{
"key": "view.sqlConfig.spark.sql.parquet.compression.codec",
"value": "snappy"
"value": "\"snappy\""
},
{
"key": "view.query.out.numCols",
"value": "3"
"value": "\"3\""
},
{
"key": "view.referredTempViewNames",
"value": "[]"
"value": "\"[]\""
},
{
"key": "view.query.out.col.1",
"value": "values"
"value": "\"values\""
},
{
"key": "view.sqlConfig.spark.sql.streaming.stopTimeout",
"value": "15s"
"value": "\"15s\""
},
{
"key": "view.catalogAndNamespace.part.0",
"value": "catalog"
"value": "\"catalog\""
},
{
"key": "view.sqlConfig.spark.sql.sources.commitProtocolClass",
"value": "com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol"
"value": "\"com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol\""
},
{
"key": "view.sqlConfig.spark.sql.sources.default",
"value": "delta"
"value": "\"delta\""
},
{
"key": "view.sqlConfig.spark.sql.legacy.createHiveTableByDefault",
"value": "false"
"value": "\"false\""
},
{
"key": "view.query.out.col.2",
"value": "nested_values"
"value": "\"nested_values\""
},
{
"key": "view.catalogAndNamespace.part.1",
"value": "default"
"value": "\"default\""
}
]
}
Expand Down
2 changes: 2 additions & 0 deletions tests/unity_catalog/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def test_yaml_config_password(test_root_dir):
http_path="path",
token="token",
source_url="http://foo.bar/{catalog}/{schema}/{table}",
has_select_permissions=False,
describe_history_limit=30,
max_concurrency=20,
output=OutputConfig(),
)

0 comments on commit 658b20f

Please sign in to comment.