From 366a12a11cb48af09f86a5b6584ab000abd072a1 Mon Sep 17 00:00:00 2001 From: Tsung-Ju Lii Date: Fri, 26 Jan 2024 01:55:00 +0800 Subject: [PATCH] Alation crawler (#761) * Alation crawler * add extractor test * add test for client * add test for config * bump version * add readme * parse steward to owner * address comment * skip tables from unsupported datasources * bump version * bump version * bump version --- metaphor/alation/README.md | 57 ++++++++++ metaphor/alation/__init__.py | 6 ++ metaphor/alation/client.py | 24 +++++ metaphor/alation/config.py | 27 +++++ metaphor/alation/extractor.py | 162 ++++++++++++++++++++++++++++ metaphor/alation/schema.py | 115 ++++++++++++++++++++ pyproject.toml | 2 +- tests/alation/__init__.py | 0 tests/alation/expected.json | 180 ++++++++++++++++++++++++++++++++ tests/alation/test_client.py | 22 ++++ tests/alation/test_config.py | 19 ++++ tests/alation/test_extractor.py | 179 +++++++++++++++++++++++++++++++ 12 files changed, 792 insertions(+), 1 deletion(-) create mode 100644 metaphor/alation/README.md create mode 100644 metaphor/alation/__init__.py create mode 100644 metaphor/alation/client.py create mode 100644 metaphor/alation/config.py create mode 100644 metaphor/alation/extractor.py create mode 100644 metaphor/alation/schema.py create mode 100644 tests/alation/__init__.py create mode 100644 tests/alation/expected.json create mode 100644 tests/alation/test_client.py create mode 100644 tests/alation/test_config.py create mode 100644 tests/alation/test_extractor.py diff --git a/metaphor/alation/README.md b/metaphor/alation/README.md new file mode 100644 index 00000000..693d2665 --- /dev/null +++ b/metaphor/alation/README.md @@ -0,0 +1,57 @@ +# DataHub Connector + +This connector extracts metadata for user generated from Alation. + +## Setup + +To run the connector, you must have a set of API token. Follow [the official documentation](https://developer.alation.com/dev/docs/authentication-into-alation-apis) to generate such a token. + +## Config File + +Create a YAML config file based on the following template. + +### Required Configurations + +```yaml +url: +token: +output: + file: + directory: +``` + +See [Output Config](../common/docs/output.md) for more information on `output`. + +### Optional Configurations + +#### Snowflake, MSSQL and Synapse Account + +If there are data sources from Snowflake, MSSQL or Synapse, please provide their accounts as follows, + +```yaml +snowflake_account: +mssql_account: +synapse_account: +``` + +#### Description Author Email + +DataHub does not keep track of the description authors. You can specify the description author email in the configuration file: + +```yaml +description_author_email: +``` + +If not provided, each dataset's first owner will be considered as the author. If no owner exists for a dataset, the placeholder email `admin@metaphor.io` will be used. + +## Testing + +Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `datahub` extra. + +To test the connector locally, change the config file to output to a local path and run the following command + +```shell +metaphor alation +``` + +Manually verify the output after the run finishes. diff --git a/metaphor/alation/__init__.py b/metaphor/alation/__init__.py new file mode 100644 index 00000000..964b67e0 --- /dev/null +++ b/metaphor/alation/__init__.py @@ -0,0 +1,6 @@ +from metaphor.alation.extractor import AlationExtractor +from metaphor.common.cli import cli_main + + +def main(config_file: str): + cli_main(AlationExtractor, config_file) diff --git a/metaphor/alation/client.py b/metaphor/alation/client.py new file mode 100644 index 00000000..d70add2e --- /dev/null +++ b/metaphor/alation/client.py @@ -0,0 +1,24 @@ +from typing import Any, Dict, Iterator, Optional + +import requests + + +class Client: + def __init__(self, base_url: str, headers: Dict[str, str]) -> None: + self.base_url = base_url + self.headers = headers + + def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Iterator[Any]: + url: Optional[str] = f"{self.base_url}/{path}" + while url: + response = requests.get( + url, + params=params if params else None, + headers=self.headers, + timeout=10, + ) + response.raise_for_status() + url = response.headers.get("X-Next-Page") + # `response.json()` is a list of objects, we're returning them as an iterator + for obj in response.json(): + yield obj diff --git a/metaphor/alation/config.py b/metaphor/alation/config.py new file mode 100644 index 00000000..91b64d1d --- /dev/null +++ b/metaphor/alation/config.py @@ -0,0 +1,27 @@ +from typing import Optional + +from pydantic.dataclasses import dataclass + +from metaphor.common.base_config import BaseConfig +from metaphor.common.dataclass import ConnectorConfig +from metaphor.models.metadata_change_event import DataPlatform + + +@dataclass(config=ConnectorConfig) +class AlationConfig(BaseConfig): + url: str + token: str + + description_author_email: Optional[str] = None + snowflake_account: Optional[str] = None + mssql_account: Optional[str] = None + synapse_account: Optional[str] = None + + def get_account(self, data_platform: DataPlatform) -> Optional[str]: + if data_platform is DataPlatform.SNOWFLAKE: + return self.snowflake_account + if data_platform is DataPlatform.MSSQL: + return self.mssql_account + if data_platform is DataPlatform.SYNAPSE: + return self.synapse_account + return None diff --git a/metaphor/alation/extractor.py b/metaphor/alation/extractor.py new file mode 100644 index 00000000..01b61539 --- /dev/null +++ b/metaphor/alation/extractor.py @@ -0,0 +1,162 @@ +from typing import Any, Collection, Dict, List, Literal, Optional + +from metaphor.alation.client import Client +from metaphor.alation.config import AlationConfig +from metaphor.alation.schema import Column, Datasource, Schema, Steward, Table +from metaphor.common.base_extractor import BaseExtractor +from metaphor.common.entity_id import dataset_normalized_name +from metaphor.common.event_util import ENTITY_TYPES +from metaphor.common.logger import get_logger +from metaphor.models.metadata_change_event import ( + ColumnTagAssignment, + DataPlatform, + Dataset, + DatasetLogicalID, + EntityType, + Ownership, + OwnershipAssignment, + TagAssignment, +) + +logger = get_logger() + + +class AlationExtractor(BaseExtractor): + """Alation metadata extractor""" + + @staticmethod + def from_config_file(config_file: str) -> "AlationExtractor": + return AlationExtractor(AlationConfig.from_yaml_file(config_file)) + + _description = "Alation metadata connector" + _platform = None + + def __init__(self, config: AlationConfig) -> None: + super().__init__(config) + self._config = config + self._client = Client(config.url, {"Token": config.token}) + self._entities: List[ENTITY_TYPES] = [] + self._schemas: Dict[int, str] = {} # schema_id to name + self._datasources: Dict[int, Datasource] = {} # ds_id to datasource + + def _get_schema(self, schema_id: int) -> str: + if schema_id not in self._schemas: + schema_obj = next( + self._client.get("integration/v2/schema/", {"id": schema_id}), None + ) + if not schema_obj: + raise ValueError(f"No schema found with id = {schema_id}") + schema = Schema.model_validate(schema_obj) + self._schemas[schema_id] = schema.qualified_name + + return self._schemas[schema_id] + + def _get_datasource(self, ds_id: int): + if ds_id not in self._datasources: + ds_obj = next(self._client.get(f"integration/v1/datasource/{ds_id}/"), None) + if not ds_obj: + if ( + next(self._client.get(f"integration/v2/datasource/{ds_id}/"), None) + is not None + ): + # TODO: support OCF datasources + raise RuntimeError("OCF datasources are currently not supported") + + raise ValueError(f"No datasource found with id = {ds_id}") + datasource = Datasource.model_validate(ds_obj) + self._datasources[ds_id] = datasource + return self._datasources[ds_id] + + def _get_table_columns(self, table_id: int): + return [ + Column.model_validate(column_obj) + for column_obj in self._client.get( + "integration/v2/column/", params={"table_id": table_id} + ) + ] + + def _get_tags(self, oid: int, otype: Literal["attribute", "table"]): + return [ + str(tag_obj["name"]) + for tag_obj in self._client.get( + "integration/tag/", params={"oid": oid, "otype": otype} + ) + ] + + def _get_tag_assignment(self, table: Table, columns: List[Column]): + tag_assignment = None + table_tags = self._get_tags(oid=table.id, otype="table") + column_tag_assignments: List[ColumnTagAssignment] = [] + for column in columns: + column_tags = self._get_tags(column.id, otype="attribute") + if column_tags: + column_tag_assignments.append( + ColumnTagAssignment( + column_name=column.name, + tag_names=column_tags, + ) + ) + if table_tags or column_tag_assignments: + tag_assignment = TagAssignment( + column_tag_assignments=column_tag_assignments + if column_tag_assignments + else None, + tag_names=table_tags, + ) + return tag_assignment + + def _get_ownership_assignment(self, steward: Optional[Steward]): + if not steward: + return None + + if steward.otype == "groupprofile": + path = f"integration/v1/group/{steward.oid}" + else: + path = f"integration/v1/user/{steward.oid}" + + owner = next(self._client.get(path), None) + if not owner: + raise ValueError( + f"No owner found for this steward, steward id = {steward.oid}, type =- {steward.otype}" + ) + + return OwnershipAssignment( + ownerships=[ + Ownership( + contact_designation_name=owner["display_name"], + person=owner["email"], + ) + ] + ) + + def _init_dataset(self, table_obj: Any) -> Dataset: + table = Table.model_validate(table_obj) + schema = self._get_schema(table.schema_id) + datasource = self._get_datasource(table.ds_id) + if datasource.platform == DataPlatform.EXTERNAL: + raise RuntimeError(f"Unsupported datasource dbtype: {datasource.dbtype}") + columns = self._get_table_columns(table.id) + name = dataset_normalized_name(datasource.dbname, schema, table.name) + dataset = Dataset( + entity_type=EntityType.DATASET, + logical_id=DatasetLogicalID( + account=self._config.get_account(datasource.platform), + name=name, + platform=datasource.platform, + ), + description_assignment=table.description_assignment( + self._config.description_author_email, columns + ), + tag_assignment=self._get_tag_assignment(table, columns), + ownership_assignment=self._get_ownership_assignment(table.steward), + ) + return dataset + + async def extract(self) -> Collection[ENTITY_TYPES]: + for table_obj in self._client.get("integration/v2/table/", {"limit": 100}): + try: + self._entities.append(self._init_dataset(table_obj)) + except Exception: + logger.exception(f"Cannot extract table, obj = {table_obj}") + + return self._entities diff --git a/metaphor/alation/schema.py b/metaphor/alation/schema.py new file mode 100644 index 00000000..1d8203f6 --- /dev/null +++ b/metaphor/alation/schema.py @@ -0,0 +1,115 @@ +from typing import Any, List, Literal, Optional + +from pydantic import BaseModel, Field + +from metaphor.models.metadata_change_event import ( + AssetDescription, + ColumnDescriptionAssignment, + DataPlatform, + DescriptionAssignment, +) + + +class CustomField(BaseModel): + value: Any + field_id: int + field_name: str + + +class Steward(BaseModel): + oid: int + otype: Literal["user", "groupprofile"] + + +class Column(BaseModel): + id: int + name: str + description: Optional[str] = None + custom_fields: List[CustomField] = Field(default_factory=list) + + +class Table(BaseModel): + id: int + name: str + description: Optional[str] = None + ds_id: int + custom_fields: List[CustomField] = Field(default_factory=list) + table_type: str + schema_id: int + + @property + def steward(self) -> Optional[Steward]: + steward_obj = next( + (f.value[0] for f in self.custom_fields if f.field_name == "Steward"), None + ) + if steward_obj: + steward = Steward.model_validate(steward_obj) + return steward + return None + + def description_assignment(self, author: Optional[str], columns: List[Column]): + asset_descriptions: Optional[List[AssetDescription]] = None + if self.description: + asset_descriptions = [ + AssetDescription(author=author, description=self.description) + ] + + column_description_assignments: List[ColumnDescriptionAssignment] = [] + for column in columns: + if column.description: + column_description_assignments.append( + ColumnDescriptionAssignment( + asset_descriptions=[ + AssetDescription( + author=author, description=column.description + ) + ], + column_name=column.name, + ) + ) + + description_assignment: Optional[DescriptionAssignment] = None + if asset_descriptions or column_description_assignments: + description_assignment = DescriptionAssignment( + asset_descriptions=asset_descriptions, + column_description_assignments=column_description_assignments + if column_description_assignments + else None, + ) + + return description_assignment + + +class Schema(BaseModel): + ds_id: int + key: str + """ + `ds_id`.`qualified_name` + """ + + @property + def qualified_name(self) -> str: + # Qualified name of the schema + return self.key[len(str(self.ds_id)) + 1 :] + + +class Datasource(BaseModel): + dbtype: str + """ + Currently the certified types are mysql, oracle, postgresql, sqlserver, redshift, teradata and snowflake + """ + dbname: Optional[str] = None + + @property + def platform(self) -> DataPlatform: + if self.dbtype == "mysql": + return DataPlatform.MYSQL + if self.dbtype == "postgresql": + return DataPlatform.POSTGRESQL + if self.dbtype == "sqlserver": + return DataPlatform.MSSQL + if self.dbtype == "redshift": + return DataPlatform.REDSHIFT + if self.dbtype == "snowflake": + return DataPlatform.SNOWFLAKE + return DataPlatform.EXTERNAL diff --git a/pyproject.toml b/pyproject.toml index fadffc7b..3c2d5bff 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.13.112" +version = "0.13.113" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/alation/__init__.py b/tests/alation/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/alation/expected.json b/tests/alation/expected.json new file mode 100644 index 00000000..54a54820 --- /dev/null +++ b/tests/alation/expected.json @@ -0,0 +1,180 @@ +[ + { + "descriptionAssignment": { + "assetDescriptions": [ + { + "description": "some description" + } + ], + "columnDescriptionAssignments": [ + { + "assetDescriptions": [ + { + "description": "col1 description" + } + ], + "columnName": "col1" + }, + { + "assetDescriptions": [ + { + "description": "col2 description" + } + ], + "columnName": "col2" + } + ] + }, + "entityType": "DATASET", + "logicalId": { + "name": "baz.bar.foo", + "platform": "SNOWFLAKE" + }, + "ownershipAssignment": { + "ownerships": [ + { + "contactDesignationName": "john doe", + "person": "john.doe@acme.com" + } + ] + }, + "tagAssignment": { + "columnTagAssignments": [ + { + "columnName": "col1", + "tagNames": [ + "col1 tag1" + ] + }, + { + "columnName": "col2", + "tagNames": [ + "col2 tag 1", + "col2 tag 2" + ] + } + ], + "tagNames": [ + "table tag1", + "table tag2", + "table tag3" + ] + } + }, + { + "descriptionAssignment": { + "assetDescriptions": [ + { + "description": "some other description" + } + ], + "columnDescriptionAssignments": [ + { + "assetDescriptions": [ + { + "description": "col1 description" + } + ], + "columnName": "col1" + }, + { + "assetDescriptions": [ + { + "description": "col2 description" + } + ], + "columnName": "col2" + } + ] + }, + "entityType": "DATASET", + "logicalId": { + "name": "baz.bar.quax", + "platform": "SNOWFLAKE" + }, + "ownershipAssignment": { + "ownerships": [ + { + "contactDesignationName": "admin group", + "person": "admin@acme.com" + } + ] + }, + "tagAssignment": { + "columnTagAssignments": [ + { + "columnName": "col1", + "tagNames": [ + "col1 tag1" + ] + }, + { + "columnName": "col2", + "tagNames": [ + "col2 tag 1", + "col2 tag 2" + ] + } + ], + "tagNames": [ + "table tag1", + "table tag2", + "table tag3" + ] + } + }, + { + "descriptionAssignment": { + "assetDescriptions": [ + { + "description": "yet another description" + } + ], + "columnDescriptionAssignments": [ + { + "assetDescriptions": [ + { + "description": "col1 description" + } + ], + "columnName": "col1" + }, + { + "assetDescriptions": [ + { + "description": "col2 description" + } + ], + "columnName": "col2" + } + ] + }, + "entityType": "DATASET", + "logicalId": { + "name": "baz.bar.quazz", + "platform": "SNOWFLAKE" + }, + "tagAssignment": { + "columnTagAssignments": [ + { + "columnName": "col1", + "tagNames": [ + "col1 tag1" + ] + }, + { + "columnName": "col2", + "tagNames": [ + "col2 tag 1", + "col2 tag 2" + ] + } + ], + "tagNames": [ + "table tag1", + "table tag2", + "table tag3" + ] + } + } +] diff --git a/tests/alation/test_client.py b/tests/alation/test_client.py new file mode 100644 index 00000000..d812f012 --- /dev/null +++ b/tests/alation/test_client.py @@ -0,0 +1,22 @@ +import json +from unittest.mock import MagicMock, patch + +from requests import Response + +from metaphor.alation.client import Client + + +@patch("requests.get") +def test_client(mock_requests_get: MagicMock) -> None: + client = Client( + base_url="http://alation-instance", headers={"Token": "woo an awesome token"} + ) + resp1 = Response() + resp1._content = json.dumps([{"id": x} for x in range(11)]).encode() + resp1.status_code = 200 + resp1.headers = {"X-Next-Page": "http://bogus-url"} # type: ignore + resp2 = Response() + resp2._content = json.dumps([{"id": x} for x in range(11, 18)]).encode() + resp2.status_code = 200 + mock_requests_get.side_effect = [resp1, resp2] + assert sorted(obj["id"] for obj in client.get("")) == list(range(18)) diff --git a/tests/alation/test_config.py b/tests/alation/test_config.py new file mode 100644 index 00000000..4f5b563e --- /dev/null +++ b/tests/alation/test_config.py @@ -0,0 +1,19 @@ +from metaphor.alation.config import AlationConfig +from metaphor.common.base_config import OutputConfig +from metaphor.models.metadata_change_event import DataPlatform + + +def test_config() -> None: + config = AlationConfig( + output=OutputConfig(), + url="url", + token="token", + snowflake_account="snowflake account", + mssql_account="mssql account", + synapse_account="synapse account", + ) + + assert config.get_account(DataPlatform.SNOWFLAKE) == "snowflake account" + assert config.get_account(DataPlatform.MSSQL) == "mssql account" + assert config.get_account(DataPlatform.SYNAPSE) == "synapse account" + assert not config.get_account(DataPlatform.BIGQUERY) diff --git a/tests/alation/test_extractor.py b/tests/alation/test_extractor.py new file mode 100644 index 00000000..cf70661e --- /dev/null +++ b/tests/alation/test_extractor.py @@ -0,0 +1,179 @@ +from typing import Any, Dict, Optional + +import pytest + +from metaphor.alation.config import AlationConfig +from metaphor.alation.extractor import AlationExtractor +from metaphor.alation.schema import ( + Column, + CustomField, + Datasource, + Schema, + Steward, + Table, +) +from metaphor.common.base_config import OutputConfig +from metaphor.common.event_util import EventUtil +from tests.test_utils import load_json + + +class MockClient: + def __init__(self) -> None: + pass + + def get(self, path: str, params: Optional[Dict[str, Any]] = None): # noqa: C901 + if path == "integration/v2/table/": + yield Table( + id=1, + name="foo", + description="some description", + ds_id=2, + table_type="TABLE", + schema_id=3, + custom_fields=[ + CustomField( + value=[Steward(oid=10, otype="user")], + field_id=6, + field_name="Steward", + ) + ], + ).model_dump() + yield Table( + id=7, + name="quax", + description="some other description", + ds_id=2, + table_type="TABLE", + schema_id=3, + custom_fields=[ + CustomField( + value=[Steward(oid=11, otype="groupprofile")], + field_id=8, + field_name="Steward", + ) + ], + ).model_dump() + yield Table( + id=9, + name="quazz", + description="yet another description", + ds_id=2, + table_type="TABLE", + schema_id=3, + ).model_dump() + yield Table( + id=12, + name="ocf", + description="table from ocf datasource, should not be parsed", + ds_id=13, + table_type="TABLE", + schema_id=3, + ).model_dump() + yield Table( + id=14, + name="teradata", + description="table from datasource with unsupported dbtype, should not be parsed", + ds_id=15, + table_type="TABLE", + schema_id=3, + ).model_dump() + return + + if "integration/v1/group/" in path: + yield { + "display_name": "admin group", + "email": "admin@acme.com", + } + return + + if "integration/v1/user/" in path: + yield { + "display_name": "john doe", + "email": "john.doe@acme.com", + } + return + + if path == "integration/v2/schema/": + yield Schema( + ds_id=2, + key="2.bar", + ).model_dump() + return + + if "integration/v1/datasource/2" in path: + yield Datasource( + dbtype="snowflake", + dbname="baz", + ).model_dump() + return + + if "integration/v1/datasource/15" in path: + yield Datasource( + dbtype="teradata", + dbname="teradata", + ).model_dump() + return + + if "integration/v2/datasource/" in path: + yield Datasource( + dbtype="snowflake", + dbname="ocf", + ).model_dump() + return + + if path == "integration/v2/column/": + for col in [ + Column( + id=4, + name="col1", + description="col1 description", + ), + Column( + id=5, + name="col2", + description="col2 description", + ), + ]: + yield col.model_dump() + return + + if path == "integration/tag/": + assert params + if params["otype"] == "attribute": + if params["oid"] == 4: + yield {"name": "col1 tag1"} + return + + for tag in [{"name": "col2 tag 1"}, {"name": "col2 tag 2"}]: + yield tag + return + + for tag in [ + { + "name": "table tag1", + }, + { + "name": "table tag2", + }, + { + "name": "table tag3", + }, + ]: + yield tag + return + + +@pytest.mark.asyncio +async def test_extractor( + test_root_dir: str, +) -> None: + config = AlationConfig( + output=OutputConfig(), + url="http://alation-instance", + token="token", + ) + extractor = AlationExtractor(config) + extractor._client = MockClient() # type: ignore + events = [EventUtil.trim_event(e) for e in await extractor.extract()] + expected = f"{test_root_dir}/alation/expected.json" + assert events == load_json(expected)