diff --git a/metaphor/common/utils.py b/metaphor/common/utils.py index 4dc405c6..c2bf85e1 100644 --- a/metaphor/common/utils.py +++ b/metaphor/common/utils.py @@ -87,6 +87,11 @@ def safe_str(value: Any) -> Optional[str]: return None if value is None else str(value) +def non_empty_str(value: Any) -> Optional[str]: + """Converts a value to str, return None if the original value is None or empty string""" + return None if value is None or value == "" else str(value) + + def safe_float(value: Optional[Union[float, int, str]]) -> Optional[float]: """Converts a value to float, return None if the original value is None or NaN or INF""" return ( diff --git a/metaphor/unity_catalog/README.md b/metaphor/unity_catalog/README.md index 2fab2d6c..c54f6de9 100644 --- a/metaphor/unity_catalog/README.md +++ b/metaphor/unity_catalog/README.md @@ -34,6 +34,10 @@ See [Output Config](../common/docs/output.md) for more information. See [Filter Configurations](../common/docs/filter.md) for more information on the optional `filter` config. +#### SELECT Permissions + +Certain metadata, such as table properties & last refreshed time, can only be extracted if the user has SELECT permissions on the table. By default, the connector will attempt to extract this metadata and print errors in the logs if it fails. To disable this behavior, set `has_select_permissions` to `false`. + #### Source URL By default, each table is associated with a Unity Catalog URL derived from the `hostname` config. @@ -67,18 +71,6 @@ query_log: See [Process Query](../common/docs/process_query.md) for more information on the optional `process_query_config` config. -#### Warehouse ID - -Note: we encourage using cluster, this connector will deprecate the SQL warehouse support. - -To run the queries using a specific warehouse, simply add its ID in the configuration file: - -```yaml -warehouse_id: -``` - -If no warehouse id nor cluster path is provided, the connector automatically uses the first discovered warehouse. - ## Testing Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `unity_catalog` extra. diff --git a/metaphor/unity_catalog/config.py b/metaphor/unity_catalog/config.py index 22fb0ead..97ad6eae 100644 --- a/metaphor/unity_catalog/config.py +++ b/metaphor/unity_catalog/config.py @@ -41,6 +41,9 @@ class UnityCatalogRunConfig(BaseConfig): # Include or exclude specific databases/schemas/tables filter: DatasetFilter = field(default_factory=lambda: DatasetFilter()) + # Whether the user has SELECT permissions on the tables + has_select_permissions: bool = True + # configs for fetching query logs query_log: UnityCatalogQueryLogConfig = field( default_factory=lambda: UnityCatalogQueryLogConfig() diff --git a/metaphor/unity_catalog/extractor.py b/metaphor/unity_catalog/extractor.py index d47b073a..a148c972 100644 --- a/metaphor/unity_catalog/extractor.py +++ b/metaphor/unity_catalog/extractor.py @@ -1,3 +1,4 @@ +import json import re import urllib.parse from typing import Collection, Dict, Iterator, List, Optional @@ -15,7 +16,7 @@ from metaphor.common.fieldpath import build_schema_field from metaphor.common.logger import get_logger from metaphor.common.models import to_dataset_statistics -from metaphor.common.utils import safe_float +from metaphor.common.utils import non_empty_str, safe_float from metaphor.models.crawler_run_metadata import Platform from metaphor.models.metadata_change_event import ( AssetPlatform, @@ -152,6 +153,8 @@ def __init__(self, config: UnityCatalogRunConfig): self._describe_history_limit = config.describe_history_limit self._max_concurrency = config.max_concurrency + self._has_select_permissions = config.has_select_permissions + self._api = create_api(f"https://{config.hostname}", config.token) self._connection = create_connection( @@ -217,9 +220,10 @@ async def extract(self) -> Collection[ENTITY_TYPES]: self._propagate_tags() - # Batch query table properties and last refreshed time - self._populate_table_properties() - self._populate_last_refreshed_time() + # Batch query table properties and last refreshed time if granted SELECT permissions + if self._has_select_permissions: + self._populate_table_properties() + self._populate_last_refreshed_time() entities: List[ENTITY_TYPES] = [] entities.extend(self._datasets.values()) @@ -269,7 +273,7 @@ def _init_dataset(self, table_info: TableInfo) -> Dataset: dataset.schema = DatasetSchema( schema_type=SchemaType.SQL, - description=table_info.comment, + description=non_empty_str(table_info.comment), fields=fields, sql_schema=SQLSchema( materialization=TABLE_TYPE_MATERIALIZATION_TYPE_MAP.get( @@ -338,7 +342,7 @@ def _init_column(self, column_info: ColumnInfo) -> SchemaField: field = build_schema_field( column_name=column_info.column_name, field_type=column_info.data_type, - description=column_info.comment, + description=non_empty_str(column_info.comment), nullable=column_info.is_nullable, precision=safe_float(column_info.data_precision), ) @@ -570,7 +574,7 @@ def _init_volume(self, volume: VolumeInfo): ) dataset.schema = DatasetSchema( - description=volume.comment, + description=non_empty_str(volume.comment), ) if volume.owner: @@ -701,7 +705,7 @@ def _populate_table_properties(self): continue dataset.unity_catalog.table_info.properties = [ - KeyValuePair(key=k, value=v) for k, v in properties.items() + KeyValuePair(key=k, value=json.dumps(v)) for k, v in properties.items() ] def _get_owner_display_name(self, user_id: str) -> str: diff --git a/metaphor/unity_catalog/queries.py b/metaphor/unity_catalog/queries.py index 3f9deba9..3943f6f5 100644 --- a/metaphor/unity_catalog/queries.py +++ b/metaphor/unity_catalog/queries.py @@ -677,7 +677,7 @@ def get_last_refreshed_time( try: cursor.execute(f"DESCRIBE HISTORY {table_full_name} LIMIT {limit}") except Exception as error: - logger.exception(f"Failed to get history for {table_full_name}: {error}") + logger.error(f"Failed to get history for {table_full_name}: {error}") return None for history in cursor.fetchall(): @@ -704,7 +704,7 @@ def get_table_properties( try: cursor.execute(f"SHOW TBLPROPERTIES {table_full_name}") except Exception as error: - logger.exception( + logger.error( f"Failed to show table properties for {table_full_name}: {error}" ) return None diff --git a/pyproject.toml b/pyproject.toml index 03d850b9..09c0f9f2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.144" +version = "0.14.145" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/common/test_utils.py b/tests/common/test_utils.py index 9a8659fa..548bf575 100644 --- a/tests/common/test_utils.py +++ b/tests/common/test_utils.py @@ -11,6 +11,7 @@ is_email, must_set_at_least_one, must_set_exactly_one, + non_empty_str, removesuffix, safe_float, safe_int, @@ -145,6 +146,13 @@ def test_safe_str(): assert safe_str(100) == "100" +def test_non_empty_str(): + assert non_empty_str(None) is None + assert non_empty_str("") is None + assert non_empty_str("string") == "string" + assert non_empty_str(100) == "100" + + def test_safe_float(): assert safe_float(None) is None assert safe_float(float("NaN")) is None diff --git a/tests/great_expectations/basic_sql/gx/.gitignore b/tests/great_expectations/basic_sql/gx/.gitignore index e69de29b..f34131a7 100644 --- a/tests/great_expectations/basic_sql/gx/.gitignore +++ b/tests/great_expectations/basic_sql/gx/.gitignore @@ -0,0 +1,2 @@ + +uncommitted/ \ No newline at end of file diff --git a/tests/great_expectations/snowflake/gx/.gitignore b/tests/great_expectations/snowflake/gx/.gitignore index e69de29b..f34131a7 100644 --- a/tests/great_expectations/snowflake/gx/.gitignore +++ b/tests/great_expectations/snowflake/gx/.gitignore @@ -0,0 +1,2 @@ + +uncommitted/ \ No newline at end of file diff --git a/tests/unity_catalog/config.yml b/tests/unity_catalog/config.yml index 2aff33a5..84b341e4 100644 --- a/tests/unity_catalog/config.yml +++ b/tests/unity_catalog/config.yml @@ -3,5 +3,7 @@ hostname: hostname http_path: path token: token source_url: http://foo.bar/{catalog}/{schema}/{table} +has_select_permissions: false describe_history_limit: 30 +max_concurrency: 20 output: {} diff --git a/tests/unity_catalog/expected.json b/tests/unity_catalog/expected.json index 0d8168dd..cb33147f 100644 --- a/tests/unity_catalog/expected.json +++ b/tests/unity_catalog/expected.json @@ -204,7 +204,7 @@ "properties": [ { "key": "delta.lastCommitTimestamp", - "value": "1664444422000" + "value": "\"1664444422000\"" } ] } @@ -286,59 +286,59 @@ "properties": [ { "key": "view.catalogAndNamespace.numParts", - "value": "2" + "value": "\"2\"" }, { "key": "view.sqlConfig.spark.sql.hive.convertCTAS", - "value": "true" + "value": "\"true\"" }, { "key": "view.query.out.col.0", - "value": "key" + "value": "\"key\"" }, { "key": "view.sqlConfig.spark.sql.parquet.compression.codec", - "value": "snappy" + "value": "\"snappy\"" }, { "key": "view.query.out.numCols", - "value": "3" + "value": "\"3\"" }, { "key": "view.referredTempViewNames", - "value": "[]" + "value": "\"[]\"" }, { "key": "view.query.out.col.1", - "value": "values" + "value": "\"values\"" }, { "key": "view.sqlConfig.spark.sql.streaming.stopTimeout", - "value": "15s" + "value": "\"15s\"" }, { "key": "view.catalogAndNamespace.part.0", - "value": "catalog" + "value": "\"catalog\"" }, { "key": "view.sqlConfig.spark.sql.sources.commitProtocolClass", - "value": "com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol" + "value": "\"com.databricks.sql.transaction.directory.DirectoryAtomicCommitProtocol\"" }, { "key": "view.sqlConfig.spark.sql.sources.default", - "value": "delta" + "value": "\"delta\"" }, { "key": "view.sqlConfig.spark.sql.legacy.createHiveTableByDefault", - "value": "false" + "value": "\"false\"" }, { "key": "view.query.out.col.2", - "value": "nested_values" + "value": "\"nested_values\"" }, { "key": "view.catalogAndNamespace.part.1", - "value": "default" + "value": "\"default\"" } ] } diff --git a/tests/unity_catalog/test_config.py b/tests/unity_catalog/test_config.py index 6bd9f7e0..87468baf 100644 --- a/tests/unity_catalog/test_config.py +++ b/tests/unity_catalog/test_config.py @@ -12,6 +12,8 @@ def test_yaml_config_password(test_root_dir): http_path="path", token="token", source_url="http://foo.bar/{catalog}/{schema}/{table}", + has_select_permissions=False, describe_history_limit=30, + max_concurrency=20, output=OutputConfig(), )