Skip to content

Commit

Permalink
Alation crawler (#761)
Browse files Browse the repository at this point in the history
* Alation crawler

* add extractor test

* add test for client

* add test for config

* bump version

* add readme

* parse steward to owner

* address comment

* skip tables from unsupported datasources

* bump version

* bump version

* bump version
  • Loading branch information
usefulalgorithm authored Jan 25, 2024
1 parent 3990b6c commit 366a12a
Show file tree
Hide file tree
Showing 12 changed files with 792 additions and 1 deletion.
57 changes: 57 additions & 0 deletions metaphor/alation/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# DataHub Connector

This connector extracts metadata for user generated from Alation.

## Setup

To run the connector, you must have a set of API token. Follow [the official documentation](https://developer.alation.com/dev/docs/authentication-into-alation-apis) to generate such a token.

## Config File

Create a YAML config file based on the following template.

### Required Configurations

```yaml
url: <url to the Alation instance>
token: <API token>
output:
file:
directory: <output_directory>
```
See [Output Config](../common/docs/output.md) for more information on `output`.

### Optional Configurations

#### Snowflake, MSSQL and Synapse Account

If there are data sources from Snowflake, MSSQL or Synapse, please provide their accounts as follows,

```yaml
snowflake_account: <snowflake_account_name>
mssql_account: <mssql_account_name>
synapse_account: <synapse_account_name>
```

#### Description Author Email

DataHub does not keep track of the description authors. You can specify the description author email in the configuration file:

```yaml
description_author_email: <email>
```

If not provided, each dataset's first owner will be considered as the author. If no owner exists for a dataset, the placeholder email `[email protected]` will be used.

## Testing

Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). Make sure to include either `all` or `datahub` extra.

To test the connector locally, change the config file to output to a local path and run the following command

```shell
metaphor alation <config_file>
```

Manually verify the output after the run finishes.
6 changes: 6 additions & 0 deletions metaphor/alation/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from metaphor.alation.extractor import AlationExtractor
from metaphor.common.cli import cli_main


def main(config_file: str):
cli_main(AlationExtractor, config_file)
24 changes: 24 additions & 0 deletions metaphor/alation/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from typing import Any, Dict, Iterator, Optional

import requests


class Client:
def __init__(self, base_url: str, headers: Dict[str, str]) -> None:
self.base_url = base_url
self.headers = headers

def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Iterator[Any]:
url: Optional[str] = f"{self.base_url}/{path}"
while url:
response = requests.get(
url,
params=params if params else None,
headers=self.headers,
timeout=10,
)
response.raise_for_status()
url = response.headers.get("X-Next-Page")
# `response.json()` is a list of objects, we're returning them as an iterator
for obj in response.json():
yield obj
27 changes: 27 additions & 0 deletions metaphor/alation/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from typing import Optional

from pydantic.dataclasses import dataclass

from metaphor.common.base_config import BaseConfig
from metaphor.common.dataclass import ConnectorConfig
from metaphor.models.metadata_change_event import DataPlatform


@dataclass(config=ConnectorConfig)
class AlationConfig(BaseConfig):
url: str
token: str

description_author_email: Optional[str] = None
snowflake_account: Optional[str] = None
mssql_account: Optional[str] = None
synapse_account: Optional[str] = None

def get_account(self, data_platform: DataPlatform) -> Optional[str]:
if data_platform is DataPlatform.SNOWFLAKE:
return self.snowflake_account
if data_platform is DataPlatform.MSSQL:
return self.mssql_account
if data_platform is DataPlatform.SYNAPSE:
return self.synapse_account
return None
162 changes: 162 additions & 0 deletions metaphor/alation/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import Any, Collection, Dict, List, Literal, Optional

from metaphor.alation.client import Client
from metaphor.alation.config import AlationConfig
from metaphor.alation.schema import Column, Datasource, Schema, Steward, Table
from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger
from metaphor.models.metadata_change_event import (
ColumnTagAssignment,
DataPlatform,
Dataset,
DatasetLogicalID,
EntityType,
Ownership,
OwnershipAssignment,
TagAssignment,
)

logger = get_logger()


class AlationExtractor(BaseExtractor):
"""Alation metadata extractor"""

@staticmethod
def from_config_file(config_file: str) -> "AlationExtractor":
return AlationExtractor(AlationConfig.from_yaml_file(config_file))

_description = "Alation metadata connector"
_platform = None

def __init__(self, config: AlationConfig) -> None:
super().__init__(config)
self._config = config
self._client = Client(config.url, {"Token": config.token})
self._entities: List[ENTITY_TYPES] = []
self._schemas: Dict[int, str] = {} # schema_id to name
self._datasources: Dict[int, Datasource] = {} # ds_id to datasource

def _get_schema(self, schema_id: int) -> str:
if schema_id not in self._schemas:
schema_obj = next(
self._client.get("integration/v2/schema/", {"id": schema_id}), None
)
if not schema_obj:
raise ValueError(f"No schema found with id = {schema_id}")
schema = Schema.model_validate(schema_obj)
self._schemas[schema_id] = schema.qualified_name

return self._schemas[schema_id]

def _get_datasource(self, ds_id: int):
if ds_id not in self._datasources:
ds_obj = next(self._client.get(f"integration/v1/datasource/{ds_id}/"), None)
if not ds_obj:
if (
next(self._client.get(f"integration/v2/datasource/{ds_id}/"), None)
is not None
):
# TODO: support OCF datasources
raise RuntimeError("OCF datasources are currently not supported")

raise ValueError(f"No datasource found with id = {ds_id}")
datasource = Datasource.model_validate(ds_obj)
self._datasources[ds_id] = datasource
return self._datasources[ds_id]

def _get_table_columns(self, table_id: int):
return [
Column.model_validate(column_obj)
for column_obj in self._client.get(
"integration/v2/column/", params={"table_id": table_id}
)
]

def _get_tags(self, oid: int, otype: Literal["attribute", "table"]):
return [
str(tag_obj["name"])
for tag_obj in self._client.get(
"integration/tag/", params={"oid": oid, "otype": otype}
)
]

def _get_tag_assignment(self, table: Table, columns: List[Column]):
tag_assignment = None
table_tags = self._get_tags(oid=table.id, otype="table")
column_tag_assignments: List[ColumnTagAssignment] = []
for column in columns:
column_tags = self._get_tags(column.id, otype="attribute")
if column_tags:
column_tag_assignments.append(
ColumnTagAssignment(
column_name=column.name,
tag_names=column_tags,
)
)
if table_tags or column_tag_assignments:
tag_assignment = TagAssignment(
column_tag_assignments=column_tag_assignments
if column_tag_assignments
else None,
tag_names=table_tags,
)
return tag_assignment

def _get_ownership_assignment(self, steward: Optional[Steward]):
if not steward:
return None

if steward.otype == "groupprofile":
path = f"integration/v1/group/{steward.oid}"
else:
path = f"integration/v1/user/{steward.oid}"

owner = next(self._client.get(path), None)
if not owner:
raise ValueError(
f"No owner found for this steward, steward id = {steward.oid}, type =- {steward.otype}"
)

return OwnershipAssignment(
ownerships=[
Ownership(
contact_designation_name=owner["display_name"],
person=owner["email"],
)
]
)

def _init_dataset(self, table_obj: Any) -> Dataset:
table = Table.model_validate(table_obj)
schema = self._get_schema(table.schema_id)
datasource = self._get_datasource(table.ds_id)
if datasource.platform == DataPlatform.EXTERNAL:
raise RuntimeError(f"Unsupported datasource dbtype: {datasource.dbtype}")
columns = self._get_table_columns(table.id)
name = dataset_normalized_name(datasource.dbname, schema, table.name)
dataset = Dataset(
entity_type=EntityType.DATASET,
logical_id=DatasetLogicalID(
account=self._config.get_account(datasource.platform),
name=name,
platform=datasource.platform,
),
description_assignment=table.description_assignment(
self._config.description_author_email, columns
),
tag_assignment=self._get_tag_assignment(table, columns),
ownership_assignment=self._get_ownership_assignment(table.steward),
)
return dataset

async def extract(self) -> Collection[ENTITY_TYPES]:
for table_obj in self._client.get("integration/v2/table/", {"limit": 100}):
try:
self._entities.append(self._init_dataset(table_obj))
except Exception:
logger.exception(f"Cannot extract table, obj = {table_obj}")

return self._entities
115 changes: 115 additions & 0 deletions metaphor/alation/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from typing import Any, List, Literal, Optional

from pydantic import BaseModel, Field

from metaphor.models.metadata_change_event import (
AssetDescription,
ColumnDescriptionAssignment,
DataPlatform,
DescriptionAssignment,
)


class CustomField(BaseModel):
value: Any
field_id: int
field_name: str


class Steward(BaseModel):
oid: int
otype: Literal["user", "groupprofile"]


class Column(BaseModel):
id: int
name: str
description: Optional[str] = None
custom_fields: List[CustomField] = Field(default_factory=list)


class Table(BaseModel):
id: int
name: str
description: Optional[str] = None
ds_id: int
custom_fields: List[CustomField] = Field(default_factory=list)
table_type: str
schema_id: int

@property
def steward(self) -> Optional[Steward]:
steward_obj = next(
(f.value[0] for f in self.custom_fields if f.field_name == "Steward"), None
)
if steward_obj:
steward = Steward.model_validate(steward_obj)
return steward
return None

def description_assignment(self, author: Optional[str], columns: List[Column]):
asset_descriptions: Optional[List[AssetDescription]] = None
if self.description:
asset_descriptions = [
AssetDescription(author=author, description=self.description)
]

column_description_assignments: List[ColumnDescriptionAssignment] = []
for column in columns:
if column.description:
column_description_assignments.append(
ColumnDescriptionAssignment(
asset_descriptions=[
AssetDescription(
author=author, description=column.description
)
],
column_name=column.name,
)
)

description_assignment: Optional[DescriptionAssignment] = None
if asset_descriptions or column_description_assignments:
description_assignment = DescriptionAssignment(
asset_descriptions=asset_descriptions,
column_description_assignments=column_description_assignments
if column_description_assignments
else None,
)

return description_assignment


class Schema(BaseModel):
ds_id: int
key: str
"""
`ds_id`.`qualified_name`
"""

@property
def qualified_name(self) -> str:
# Qualified name of the schema
return self.key[len(str(self.ds_id)) + 1 :]


class Datasource(BaseModel):
dbtype: str
"""
Currently the certified types are mysql, oracle, postgresql, sqlserver, redshift, teradata and snowflake
"""
dbname: Optional[str] = None

@property
def platform(self) -> DataPlatform:
if self.dbtype == "mysql":
return DataPlatform.MYSQL
if self.dbtype == "postgresql":
return DataPlatform.POSTGRESQL
if self.dbtype == "sqlserver":
return DataPlatform.MSSQL
if self.dbtype == "redshift":
return DataPlatform.REDSHIFT
if self.dbtype == "snowflake":
return DataPlatform.SNOWFLAKE
return DataPlatform.EXTERNAL
Loading

0 comments on commit 366a12a

Please sign in to comment.