Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Athena crawler #995

Merged
merged 8 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Each connector is placed under its own directory under [metaphor](./metaphor) an

| Connector Name | Metadata |
|-------------------------------------------------------------------|------------------------------------------|
| [athena](metaphor/athena/) | Schema, description, queries |
| [azure_data_factory](metaphor/azure_data_factory/) | Lineage, Pipeline |
| [bigquery](metaphor/bigquery/) | Schema, description, statistics, queries |
| [bigquery.lineage](metaphor/bigquery/lineage/) | Lineage |
Expand Down
67 changes: 67 additions & 0 deletions metaphor/athena/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Athena Connector

This connector extracts technical metadata and query history from AWS Athena using the [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) library.

## Setup

We recommend creating a dedicated AWS IAM user for the crawler with limited permissions based on the following IAM policy:

``` json
{
"Version": "2012-10-17",
"Statement":
[
{
"Effect": "Allow",
"Action":
[
"athena:ListDataCatalogs",
"athena:ListDatabases",
"athena:ListTableMetadata",
"athena:ListQueryExecutions",
"athena:BatchGetQueryExecution",
"glue:GetDatabases",
"glue:GetTables"
],
"Resource":
[
"*"
]
}
]
}
```

## Config File

Create a YAML config file based on the following template.

### Required Configurations

You must specify an AWS user credential to access QuickSight API. You can also specify a role ARN and let the connector assume the role before accessing AWS APIs.

```yaml
aws:
access_key_id: <aws_access_key_id>
secret_access_key: <aws_secret_access_key>
region_name: <aws_region_name>
assume_role_arn: <aws_role_arn> # If using IAM role
```

### Optional Configurations

#### Output Destination

See [Output Config](../common/docs/output.md) for more information.

## Testing

Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv).

Run the following command to test the connector locally:

```shell
metaphor athena <config_file>
```

Manually verify the output after the run finishes.
6 changes: 6 additions & 0 deletions metaphor/athena/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from metaphor.athena.extractor import AthenaExtractor
from metaphor.common.cli import cli_main


def main(config_file: str):
cli_main(AthenaExtractor, config_file)
16 changes: 16 additions & 0 deletions metaphor/athena/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from dataclasses import field

from pydantic.dataclasses import dataclass

from metaphor.common.aws import AwsCredentials
from metaphor.common.base_config import BaseConfig
from metaphor.common.dataclass import ConnectorConfig
from metaphor.common.filter import DatasetFilter


@dataclass(config=ConnectorConfig)
class AthenaRunConfig(BaseConfig):
aws: AwsCredentials

# Include or exclude specific databases/schemas/tables
filter: DatasetFilter = field(default_factory=lambda: DatasetFilter())
229 changes: 229 additions & 0 deletions metaphor/athena/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
from typing import Collection, Dict, Iterator, List

import boto3

from metaphor.athena.config import AthenaRunConfig, AwsCredentials
from metaphor.athena.models import (
BatchGetQueryExecutionResponse,
TableMetadata,
TableTypeEnum,
)
from metaphor.common.base_extractor import BaseExtractor
from metaphor.common.entity_id import dataset_normalized_name
from metaphor.common.event_util import ENTITY_TYPES
from metaphor.common.logger import get_logger, json_dump_to_debug_file
from metaphor.common.sql.table_level_lineage.table_level_lineage import (
extract_table_level_lineage,
)
from metaphor.common.utils import chunks, md5_digest, to_utc_time
from metaphor.models.crawler_run_metadata import Platform
from metaphor.models.metadata_change_event import (
DataPlatform,
Dataset,
DatasetLogicalID,
DatasetSchema,
DatasetStructure,
MaterializationType,
QueryLog,
SchemaField,
SchemaType,
SourceInfo,
SQLSchema,
)

logger = get_logger()


def create_athena_client(aws: AwsCredentials) -> boto3.client:
return aws.get_session().client("athena")

Check warning on line 38 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L38

Added line #L38 was not covered by tests


SUPPORTED_CATALOG_TYPE = ("GLUE", "HIVE")


class AthenaExtractor(BaseExtractor):
"""Athena metadata extractor"""

_description = "Athena metadata crawler"
_platform = Platform.ATHENA

@staticmethod
def from_config_file(config_file: str) -> "AthenaExtractor":
return AthenaExtractor(AthenaRunConfig.from_yaml_file(config_file))

Check warning on line 52 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L52

Added line #L52 was not covered by tests

def __init__(self, config: AthenaRunConfig) -> None:
super().__init__(config)
self._datasets: Dict[str, Dataset] = {}
self._aws_config = config.aws

async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info("Fetching metadata from Athena")

self._client = create_athena_client(self._aws_config)

for catalog in self._get_catalogs():
databases = self._get_databases(catalog)
for database in databases:
self._extract_tables(catalog, database)

return self._datasets.values()

def collect_query_logs(self) -> Iterator[QueryLog]:
for page in self._paginate_and_dump_response("list_query_executions"):
ids = page["QueryExecutionIds"]
yield from self._batch_get_queries(ids)

def _get_catalogs(self):
database_names = []
for page in self._paginate_and_dump_response("list_data_catalogs"):
for item in page["DataCatalogsSummary"]:
if item["Type"] not in SUPPORTED_CATALOG_TYPE:
continue

Check warning on line 81 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L81

Added line #L81 was not covered by tests
database_names.append(item["CatalogName"])

return database_names

def _get_databases(self, catalog: str):
database_names = []
for page in self._paginate_and_dump_response(
"list_databases", CatalogName=catalog
):
for database in page["DatabaseList"]:
database_names.append(database["Name"])

return database_names

def _extract_tables(self, catalog: str, database: str):
for page in self._paginate_and_dump_response(
"list_table_metadata", CatalogName=catalog, DatabaseName=database
):
for table_metadata in page["TableMetadataList"]:
self._init_dataset(catalog, database, TableMetadata(**table_metadata))

def _paginate_and_dump_response(self, api_endpoint: str, **paginator_args):
paginator = self._client.get_paginator(api_endpoint)
paginator_response = paginator.paginate(**paginator_args)

for page in paginator_response:
request_id = page["ResponseMetadata"]["RequestId"]
json_dump_to_debug_file(page, f"{api_endpoint}_{request_id}.json")
yield page

def _init_dataset(self, catalog: str, database: str, table_metadata: TableMetadata):
name = dataset_normalized_name(
db=catalog, schema=database, table=table_metadata.Name
)

table_type = (
TableTypeEnum(table_metadata.TableType)
if table_metadata.TableType
and table_metadata.TableType in (enum.value for enum in TableTypeEnum)
else None
)

dataset = Dataset(
logical_id=DatasetLogicalID(
name=name,
platform=DataPlatform.ATHENA,
),
source_info=SourceInfo(
created_at_source=table_metadata.CreateTime,
),
schema=(
DatasetSchema(
fields=[
SchemaField(
description=column.Comment,
field_name=column.Name,
field_path=column.Name.lower(),
native_type=column.Type,
)
for column in table_metadata.Columns
],
schema_type=SchemaType.SQL,
sql_schema=SQLSchema(
materialization=(
MaterializationType.TABLE
if table_type == TableTypeEnum.EXTERNAL_TABLE
else (
MaterializationType.VIEW
if table_type == TableTypeEnum.VIRTUAL_VIEW
else None
)
)
),
)
if table_metadata.Columns
else None
),
structure=DatasetStructure(
database=catalog,
schema=database,
table=table_metadata.Name,
),
)

self._datasets[name] = dataset

def _batch_get_queries(self, query_execution_ids: List[str]) -> List[QueryLog]:
query_logs: List[QueryLog] = []
for ids in chunks(query_execution_ids, 50):
raw_response = self._client.batch_get_query_execution(QueryExecutionIds=ids)
request_id = raw_response["ResponseMetadata"]["RequestId"]
json_dump_to_debug_file(
raw_response, f"batch_get_query_execution_{request_id}.json"
)

response = BatchGetQueryExecutionResponse(**raw_response)
for unprocessed in response.UnprocessedQueryExecutionIds or []:
logger.warning(

Check warning on line 179 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L179

Added line #L179 was not covered by tests
f"id: {unprocessed.QueryExecutionId}, msg: {unprocessed.ErrorMessage}"
)

for query_execution in response.QueryExecutions or []:
if query_execution.StatementType == "UTILITY":
# Skip utility query, e.g. DESC TABLE
continue

Check warning on line 186 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L186

Added line #L186 was not covered by tests

query = query_execution.Query
if not query:
continue

Check warning on line 190 in metaphor/athena/extractor.py

View check run for this annotation

Codecov / codecov/patch

metaphor/athena/extractor.py#L190

Added line #L190 was not covered by tests

context = query_execution.QueryExecutionContext
database, schema = (
(context.Catalog, context.Database) if context else (None, None)
)

tll = extract_table_level_lineage(
sql=query,
platform=DataPlatform.ATHENA,
account=None,
default_database=database,
default_schema=schema,
)

start_time = (
to_utc_time(query_execution.Status.SubmissionDateTime)
if query_execution.Status
and query_execution.Status.SubmissionDateTime
else None
)

query_logs.append(
QueryLog(
duration=(
query_execution.Statistics.TotalExecutionTimeInMillis
if query_execution.Statistics
else None
),
platform=DataPlatform.ATHENA,
query_id=query_execution.QueryExecutionId,
sources=tll.sources,
targets=tll.targets,
sql=query,
sql_hash=md5_digest(query.encode("utf-8")),
start_time=start_time,
)
)

return query_logs
Loading
Loading