diff --git a/README.md b/README.md index 6dca611c..02e0a346 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,7 @@ Each connector is placed under its own directory under [metaphor](./metaphor) an | Connector Name | Metadata | |-------------------------------------------------------------------|------------------------------------------| +| [athena](metaphor/athena/) | Schema, description, queries | | [azure_data_factory](metaphor/azure_data_factory/) | Lineage, Pipeline | | [bigquery](metaphor/bigquery/) | Schema, description, statistics, queries | | [bigquery.lineage](metaphor/bigquery/lineage/) | Lineage | diff --git a/metaphor/athena/README.md b/metaphor/athena/README.md new file mode 100644 index 00000000..1e0caeb4 --- /dev/null +++ b/metaphor/athena/README.md @@ -0,0 +1,67 @@ +# Athena Connector + +This connector extracts technical metadata and query history from AWS Athena using the [boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html) library. + +## Setup + +We recommend creating a dedicated AWS IAM user for the crawler with limited permissions based on the following IAM policy: + +``` json +{ + "Version": "2012-10-17", + "Statement": + [ + { + "Effect": "Allow", + "Action": + [ + "athena:ListDataCatalogs", + "athena:ListDatabases", + "athena:ListTableMetadata", + "athena:ListQueryExecutions", + "athena:BatchGetQueryExecution", + "glue:GetDatabases", + "glue:GetTables" + ], + "Resource": + [ + "*" + ] + } + ] +} +``` + +## Config File + +Create a YAML config file based on the following template. + +### Required Configurations + +You must specify an AWS user credential to access QuickSight API. You can also specify a role ARN and let the connector assume the role before accessing AWS APIs. + +```yaml +aws: + access_key_id: + secret_access_key: + region_name: + assume_role_arn: # If using IAM role +``` + +### Optional Configurations + +#### Output Destination + +See [Output Config](../common/docs/output.md) for more information. + +## Testing + +Follow the [Installation](../../README.md) instructions to install `metaphor-connectors` in your environment (or virtualenv). + +Run the following command to test the connector locally: + +```shell +metaphor athena +``` + +Manually verify the output after the run finishes. diff --git a/metaphor/athena/__init__.py b/metaphor/athena/__init__.py new file mode 100644 index 00000000..cdf31ac7 --- /dev/null +++ b/metaphor/athena/__init__.py @@ -0,0 +1,6 @@ +from metaphor.athena.extractor import AthenaExtractor +from metaphor.common.cli import cli_main + + +def main(config_file: str): + cli_main(AthenaExtractor, config_file) diff --git a/metaphor/athena/config.py b/metaphor/athena/config.py new file mode 100644 index 00000000..288541f9 --- /dev/null +++ b/metaphor/athena/config.py @@ -0,0 +1,16 @@ +from dataclasses import field + +from pydantic.dataclasses import dataclass + +from metaphor.common.aws import AwsCredentials +from metaphor.common.base_config import BaseConfig +from metaphor.common.dataclass import ConnectorConfig +from metaphor.common.filter import DatasetFilter + + +@dataclass(config=ConnectorConfig) +class AthenaRunConfig(BaseConfig): + aws: AwsCredentials + + # Include or exclude specific databases/schemas/tables + filter: DatasetFilter = field(default_factory=lambda: DatasetFilter()) diff --git a/metaphor/athena/extractor.py b/metaphor/athena/extractor.py new file mode 100644 index 00000000..760a153d --- /dev/null +++ b/metaphor/athena/extractor.py @@ -0,0 +1,229 @@ +from typing import Collection, Dict, Iterator, List + +import boto3 + +from metaphor.athena.config import AthenaRunConfig, AwsCredentials +from metaphor.athena.models import ( + BatchGetQueryExecutionResponse, + TableMetadata, + TableTypeEnum, +) +from metaphor.common.base_extractor import BaseExtractor +from metaphor.common.entity_id import dataset_normalized_name +from metaphor.common.event_util import ENTITY_TYPES +from metaphor.common.logger import get_logger, json_dump_to_debug_file +from metaphor.common.sql.table_level_lineage.table_level_lineage import ( + extract_table_level_lineage, +) +from metaphor.common.utils import chunks, md5_digest, to_utc_time +from metaphor.models.crawler_run_metadata import Platform +from metaphor.models.metadata_change_event import ( + DataPlatform, + Dataset, + DatasetLogicalID, + DatasetSchema, + DatasetStructure, + MaterializationType, + QueryLog, + SchemaField, + SchemaType, + SourceInfo, + SQLSchema, +) + +logger = get_logger() + + +def create_athena_client(aws: AwsCredentials) -> boto3.client: + return aws.get_session().client("athena") + + +SUPPORTED_CATALOG_TYPE = ("GLUE", "HIVE") + + +class AthenaExtractor(BaseExtractor): + """Athena metadata extractor""" + + _description = "Athena metadata crawler" + _platform = Platform.ATHENA + + @staticmethod + def from_config_file(config_file: str) -> "AthenaExtractor": + return AthenaExtractor(AthenaRunConfig.from_yaml_file(config_file)) + + def __init__(self, config: AthenaRunConfig) -> None: + super().__init__(config) + self._datasets: Dict[str, Dataset] = {} + self._aws_config = config.aws + + async def extract(self) -> Collection[ENTITY_TYPES]: + logger.info("Fetching metadata from Athena") + + self._client = create_athena_client(self._aws_config) + + for catalog in self._get_catalogs(): + databases = self._get_databases(catalog) + for database in databases: + self._extract_tables(catalog, database) + + return self._datasets.values() + + def collect_query_logs(self) -> Iterator[QueryLog]: + for page in self._paginate_and_dump_response("list_query_executions"): + ids = page["QueryExecutionIds"] + yield from self._batch_get_queries(ids) + + def _get_catalogs(self): + database_names = [] + for page in self._paginate_and_dump_response("list_data_catalogs"): + for item in page["DataCatalogsSummary"]: + if item["Type"] not in SUPPORTED_CATALOG_TYPE: + continue + database_names.append(item["CatalogName"]) + + return database_names + + def _get_databases(self, catalog: str): + database_names = [] + for page in self._paginate_and_dump_response( + "list_databases", CatalogName=catalog + ): + for database in page["DatabaseList"]: + database_names.append(database["Name"]) + + return database_names + + def _extract_tables(self, catalog: str, database: str): + for page in self._paginate_and_dump_response( + "list_table_metadata", CatalogName=catalog, DatabaseName=database + ): + for table_metadata in page["TableMetadataList"]: + self._init_dataset(catalog, database, TableMetadata(**table_metadata)) + + def _paginate_and_dump_response(self, api_endpoint: str, **paginator_args): + paginator = self._client.get_paginator(api_endpoint) + paginator_response = paginator.paginate(**paginator_args) + + for page in paginator_response: + request_id = page["ResponseMetadata"]["RequestId"] + json_dump_to_debug_file(page, f"{api_endpoint}_{request_id}.json") + yield page + + def _init_dataset(self, catalog: str, database: str, table_metadata: TableMetadata): + name = dataset_normalized_name( + db=catalog, schema=database, table=table_metadata.Name + ) + + table_type = ( + TableTypeEnum(table_metadata.TableType) + if table_metadata.TableType + and table_metadata.TableType in (enum.value for enum in TableTypeEnum) + else None + ) + + dataset = Dataset( + logical_id=DatasetLogicalID( + name=name, + platform=DataPlatform.ATHENA, + ), + source_info=SourceInfo( + created_at_source=table_metadata.CreateTime, + ), + schema=( + DatasetSchema( + fields=[ + SchemaField( + description=column.Comment, + field_name=column.Name, + field_path=column.Name.lower(), + native_type=column.Type, + ) + for column in table_metadata.Columns + ], + schema_type=SchemaType.SQL, + sql_schema=SQLSchema( + materialization=( + MaterializationType.TABLE + if table_type == TableTypeEnum.EXTERNAL_TABLE + else ( + MaterializationType.VIEW + if table_type == TableTypeEnum.VIRTUAL_VIEW + else None + ) + ) + ), + ) + if table_metadata.Columns + else None + ), + structure=DatasetStructure( + database=catalog, + schema=database, + table=table_metadata.Name, + ), + ) + + self._datasets[name] = dataset + + def _batch_get_queries(self, query_execution_ids: List[str]) -> List[QueryLog]: + query_logs: List[QueryLog] = [] + for ids in chunks(query_execution_ids, 50): + raw_response = self._client.batch_get_query_execution(QueryExecutionIds=ids) + request_id = raw_response["ResponseMetadata"]["RequestId"] + json_dump_to_debug_file( + raw_response, f"batch_get_query_execution_{request_id}.json" + ) + + response = BatchGetQueryExecutionResponse(**raw_response) + for unprocessed in response.UnprocessedQueryExecutionIds or []: + logger.warning( + f"id: {unprocessed.QueryExecutionId}, msg: {unprocessed.ErrorMessage}" + ) + + for query_execution in response.QueryExecutions or []: + if query_execution.StatementType == "UTILITY": + # Skip utility query, e.g. DESC TABLE + continue + + query = query_execution.Query + if not query: + continue + + context = query_execution.QueryExecutionContext + database, schema = ( + (context.Catalog, context.Database) if context else (None, None) + ) + + tll = extract_table_level_lineage( + sql=query, + platform=DataPlatform.ATHENA, + account=None, + default_database=database, + default_schema=schema, + ) + + start_time = ( + to_utc_time(query_execution.Status.SubmissionDateTime) + if query_execution.Status + and query_execution.Status.SubmissionDateTime + else None + ) + + query_logs.append( + QueryLog( + duration=( + query_execution.Statistics.TotalExecutionTimeInMillis + if query_execution.Statistics + else None + ), + platform=DataPlatform.ATHENA, + query_id=query_execution.QueryExecutionId, + sources=tll.sources, + targets=tll.targets, + sql=query, + sql_hash=md5_digest(query.encode("utf-8")), + start_time=start_time, + ) + ) + + return query_logs diff --git a/metaphor/athena/models.py b/metaphor/athena/models.py new file mode 100644 index 00000000..be52aa76 --- /dev/null +++ b/metaphor/athena/models.py @@ -0,0 +1,70 @@ +from datetime import datetime +from enum import Enum +from typing import Dict, List, Optional + +from pydantic import Field +from pydantic.dataclasses import dataclass + + +@dataclass +class ColumnMetadata: + Name: str + Type: str + Comment: Optional[str] = None + + +class TableTypeEnum(Enum): + EXTERNAL_TABLE = "EXTERNAL_TABLE" + VIRTUAL_VIEW = "VIRTUAL_VIEW" + + +@dataclass +class TableMetadata: + Name: str + CreateTime: datetime + Parameters: Dict[str, Optional[str]] = Field(default_factory=dict) + LastAccessTime: Optional[datetime] = None + Columns: List[ColumnMetadata] = Field(default_factory=list) + PartitionKeys: List[ColumnMetadata] = Field(default_factory=list) + TableType: Optional[str] = None + + +@dataclass +class QueryExecutionStatus: + State: Optional[str] = None + SubmissionDateTime: Optional[datetime] = None + + +@dataclass +class QueryExecutionStatistics: + TotalExecutionTimeInMillis: Optional[int] = None + + +@dataclass +class TypeQueryExecutionContext: + Database: Optional[str] = None + Catalog: Optional[str] = None + + +@dataclass +class QueryExecution: + QueryExecutionId: str + Query: Optional[str] = None + StatementType: Optional[str] = None + QueryExecutionContext: Optional[TypeQueryExecutionContext] = None + Status: Optional[QueryExecutionStatus] = None + Statistics: Optional[QueryExecutionStatistics] = None + SubstatementType: Optional[str] = None + + +@dataclass +class UnprocessedQueryExecutionId: + QueryExecutionId: str + ErrorCode: Optional[str] = None + ErrorMessage: Optional[str] = None + + +@dataclass +class BatchGetQueryExecutionResponse: + QueryExecutions: Optional[List[QueryExecution]] + UnprocessedQueryExecutionIds: Optional[List[UnprocessedQueryExecutionId]] diff --git a/poetry.lock b/poetry.lock index f3923bde..dd65244e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3258,13 +3258,13 @@ files = [ [[package]] name = "metaphor-models" -version = "0.40.2" +version = "0.40.3" description = "" optional = false python-versions = "<4.0,>=3.8" files = [ - {file = "metaphor_models-0.40.2-py3-none-any.whl", hash = "sha256:159aca34cb3ce2936e8ef3a88a65f2890f74d098a2d164a2f97c16f9f7602bed"}, - {file = "metaphor_models-0.40.2.tar.gz", hash = "sha256:81d11a2a91cc63a1f50b099e7365ee83fa1ffb964dcbb46246b4565096bbc18b"}, + {file = "metaphor_models-0.40.3-py3-none-any.whl", hash = "sha256:30ffdd0620fc4c036644187fb5ce3dcd13be0ad28c4a0ece4cc73505828f9a7e"}, + {file = "metaphor_models-0.40.3.tar.gz", hash = "sha256:e70cf0a1b44cdf4de1746c89e70ee6c832ce65788a99adfe6bdfa5abb47b92cb"}, ] [[package]] @@ -6748,4 +6748,4 @@ unity-catalog = ["databricks-sdk", "databricks-sql-connector", "sqlglot"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<3.12" -content-hash = "02b2607b94c52edb3d0ef10578be05296ac0bd3d8efa14d2edf37fced349c38c" +content-hash = "ae8abd3fd4a132395bfd9cbd051ea8b05212943eea3a808933a5f49fa7fbc44f" diff --git a/pyproject.toml b/pyproject.toml index 41f4f70e..f3e8bff8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.112" +version = "0.14.113" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] @@ -42,7 +42,7 @@ llama-index-readers-confluence = { version = "^0.1.4", optional = true } llama-index-readers-notion = { version = "^0.1.6", optional = true } looker-sdk = { version = "^24.2.0", optional = true } lxml = { version = "~=5.0.0", optional = true } -metaphor-models = "0.40.2" +metaphor-models = "0.40.3" more-itertools = { version = "^10.1.0", optional = true } msal = { version = "^1.28.0", optional = true } msgraph-beta-sdk = { version = "~1.4.0", optional = true } diff --git a/tests/athena/__init__.py b/tests/athena/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json b/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json new file mode 100644 index 00000000..ffd8acd7 --- /dev/null +++ b/tests/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json @@ -0,0 +1,145 @@ +{ + "QueryExecutions": [ + { + "QueryExecutionId": "a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff", + "Query": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", + "StatementType": "DML", + "ResultConfiguration": { + "OutputLocation": "s3://metaphor-athena-output/athena/a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff.csv", + "EncryptionConfiguration": { + "EncryptionOption": "SSE_S3" + } + }, + "ResultReuseConfiguration": { + "ResultReuseByAgeConfiguration": { + "Enabled": false + } + }, + "QueryExecutionContext": { + "Database": "spectrum_db2", + "Catalog": "awsdatacatalog" + }, + "Status": { + "State": "SUCCEEDED", + "SubmissionDateTime": "2024-10-03 00:50:12.646000+08:00", + "CompletionDateTime": "2024-10-03 00:50:14.006000+08:00" + }, + "Statistics": { + "EngineExecutionTimeInMillis": 1219, + "DataScannedInBytes": 1208093, + "TotalExecutionTimeInMillis": 1360, + "QueryQueueTimeInMillis": 60, + "ServicePreProcessingTimeInMillis": 53, + "QueryPlanningTimeInMillis": 54, + "ServiceProcessingTimeInMillis": 28, + "ResultReuseInformation": { + "ReusedPreviousResult": false + } + }, + "WorkGroup": "primary", + "EngineVersion": { + "SelectedEngineVersion": "AUTO", + "EffectiveEngineVersion": "Athena engine version 3" + }, + "SubstatementType": "SELECT" + }, + { + "QueryExecutionId": "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab", + "Query": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", + "StatementType": "DML", + "ResultConfiguration": { + "OutputLocation": "s3://metaphor-athena-output/athena/8c944c49-ccc0-43b2-9dc9-e2428c76e8ab.csv", + "EncryptionConfiguration": { + "EncryptionOption": "SSE_S3" + } + }, + "ResultReuseConfiguration": { + "ResultReuseByAgeConfiguration": { + "Enabled": false + } + }, + "QueryExecutionContext": { + "Database": "spectrum_db2", + "Catalog": "awsdatacatalog" + }, + "Status": { + "State": "SUCCEEDED", + "SubmissionDateTime": "2024-10-02 16:48:48.443000+08:00", + "CompletionDateTime": "2024-10-02 16:48:49.205000+08:00" + }, + "Statistics": { + "EngineExecutionTimeInMillis": 600, + "DataScannedInBytes": 1208093, + "TotalExecutionTimeInMillis": 762, + "QueryQueueTimeInMillis": 66, + "ServicePreProcessingTimeInMillis": 68, + "QueryPlanningTimeInMillis": 55, + "ServiceProcessingTimeInMillis": 28, + "ResultReuseInformation": { + "ReusedPreviousResult": false + } + }, + "WorkGroup": "primary", + "EngineVersion": { + "SelectedEngineVersion": "AUTO", + "EffectiveEngineVersion": "Athena engine version 3" + }, + "SubstatementType": "SELECT" + }, + { + "QueryExecutionId": "a80c3d38-5a82-450c-a7f3-58bc476597d8", + "Query": "-- View Example\nCREATE OR REPLACE VIEW sales_view AS\nSELECT salesid, listid, sellerid, buyerid, dateid, qtysold, pricepaid, commission, saletime\nFROM sales\nWHERE commission > 10", + "StatementType": "DDL", + "ResultConfiguration": { + "OutputLocation": "s3://metaphor-athena-output/athena/a80c3d38-5a82-450c-a7f3-58bc476597d8.txt", + "EncryptionConfiguration": { + "EncryptionOption": "SSE_S3" + } + }, + "ResultReuseConfiguration": { + "ResultReuseByAgeConfiguration": { + "Enabled": false + } + }, + "QueryExecutionContext": { + "Database": "spectrum_db2", + "Catalog": "awsdatacatalog" + }, + "Status": { + "State": "SUCCEEDED", + "SubmissionDateTime": "2024-10-03 00:50:38.339000+08:00", + "CompletionDateTime": "2024-10-03 00:50:38.886000+08:00" + }, + "Statistics": { + "EngineExecutionTimeInMillis": 475, + "DataScannedInBytes": 0, + "TotalExecutionTimeInMillis": 547, + "QueryQueueTimeInMillis": 40, + "ServicePreProcessingTimeInMillis": 14, + "ServiceProcessingTimeInMillis": 18, + "ResultReuseInformation": { + "ReusedPreviousResult": false + } + }, + "WorkGroup": "primary", + "EngineVersion": { + "SelectedEngineVersion": "AUTO", + "EffectiveEngineVersion": "Athena engine version 3" + }, + "SubstatementType": "CREATE_VIEW" + } + ], + "UnprocessedQueryExecutionIds": [], + "ResponseMetadata": { + "RequestId": "ef77e890-ca4f-4e46-b4d3-ee90915237ae", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Thu, 03 Oct 2024 07:34:55 GMT", + "content-type": "application/x-amz-json-1.1", + "content-length": "3250", + "connection": "keep-alive", + "x-amzn-requestid": "ef77e890-ca4f-4e46-b4d3-ee90915237ae" + }, + "RetryAttempts": 0 + } +} diff --git a/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json b/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json new file mode 100644 index 00000000..50a93f0d --- /dev/null +++ b/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json @@ -0,0 +1,20 @@ +{ + "DataCatalogsSummary": [ + { + "CatalogName": "AwsDataCatalog", + "Type": "GLUE" + } + ], + "ResponseMetadata": { + "RequestId": "f8346904-fad9-4b57-a4ca-4a5df6392d62", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Thu, 03 Oct 2024 06:47:14 GMT", + "content-type": "application/x-amz-json-1.1", + "content-length": "99", + "connection": "keep-alive", + "x-amzn-requestid": "f8346904-fad9-4b57-a4ca-4a5df6392d62" + }, + "RetryAttempts": 0 + } +} diff --git a/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json b/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json new file mode 100644 index 00000000..def8831c --- /dev/null +++ b/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json @@ -0,0 +1,19 @@ +{ + "DatabaseList": [ + { + "Name": "spectrum_db2" + } + ], + "ResponseMetadata": { + "RequestId": "3039188f-6f71-4c41-b79c-cac7bcb905bc", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Thu, 03 Oct 2024 06:47:14 GMT", + "content-type": "application/x-amz-json-1.1", + "content-length": "253", + "connection": "keep-alive", + "x-amzn-requestid": "3039188f-6f71-4c41-b79c-cac7bcb905bc" + }, + "RetryAttempts": 0 + } +} diff --git a/tests/athena/data/list_query_executions_3ec07c89-d2ee-414a-8490-c0a63471bd47.json b/tests/athena/data/list_query_executions_3ec07c89-d2ee-414a-8490-c0a63471bd47.json new file mode 100644 index 00000000..0cc25a17 --- /dev/null +++ b/tests/athena/data/list_query_executions_3ec07c89-d2ee-414a-8490-c0a63471bd47.json @@ -0,0 +1,19 @@ +{ + "QueryExecutionIds": [ + "a80c3d38-5a82-450c-a7f3-58bc476597d8", + "a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff", + "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab" + ], + "ResponseMetadata": { + "RequestId": "3ec07c89-d2ee-414a-8490-c0a63471bd47", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Thu, 03 Oct 2024 07:34:55 GMT", + "content-type": "application/x-amz-json-1.1", + "content-length": "140", + "connection": "keep-alive", + "x-amzn-requestid": "3ec07c89-d2ee-414a-8490-c0a63471bd47" + }, + "RetryAttempts": 0 + } +} diff --git a/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json b/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json new file mode 100644 index 00000000..6bb41415 --- /dev/null +++ b/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json @@ -0,0 +1,127 @@ +{ + "TableMetadataList": [ + { + "Name": "sales", + "CreateTime": "2022-09-07 21:46:03+08:00", + "LastAccessTime": "1970-01-01 08:00:00+08:00", + "TableType": "EXTERNAL_TABLE", + "Columns": [ + { + "Name": "salesid", + "Type": "int" + }, + { + "Name": "listid", + "Type": "int" + }, + { + "Name": "sellerid", + "Type": "int" + }, + { + "Name": "buyerid", + "Type": "int" + }, + { + "Name": "eventid", + "Type": "int" + }, + { + "Name": "dateid", + "Type": "smallint" + }, + { + "Name": "qtysold", + "Type": "smallint" + }, + { + "Name": "pricepaid", + "Type": "decimal(8,2)" + }, + { + "Name": "commission", + "Type": "decimal(8,2)" + }, + { + "Name": "saletime", + "Type": "timestamp" + } + ], + "PartitionKeys": [], + "Parameters": { + "EXTERNAL": "TRUE", + "inputformat": "org.apache.hadoop.mapred.TextInputFormat", + "location": "s3://metaphor-specturm/example", + "numRows": "172000", + "outputformat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "serde.param.field.delim": "\t", + "serde.param.serialization.format": "\t", + "serde.serialization.lib": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "transient_lastDdlTime": "1662558363" + } + }, + { + "Name": "sales_view", + "CreateTime": "2024-10-03 00:50:38+08:00", + "TableType": "VIRTUAL_VIEW", + "Columns": [ + { + "Name": "salesid", + "Type": "int" + }, + { + "Name": "listid", + "Type": "int" + }, + { + "Name": "sellerid", + "Type": "int" + }, + { + "Name": "buyerid", + "Type": "int" + }, + { + "Name": "dateid", + "Type": "smallint" + }, + { + "Name": "qtysold", + "Type": "smallint" + }, + { + "Name": "pricepaid", + "Type": "decimal(8,2)" + }, + { + "Name": "commission", + "Type": "decimal(8,2)" + }, + { + "Name": "saletime", + "Type": "timestamp" + } + ], + "Parameters": { + "comment": "Presto View", + "inputformat": null, + "location": "", + "outputformat": null, + "presto_view": "true", + "serde.serialization.lib": null + } + } + ], + "ResponseMetadata": { + "RequestId": "e6d7f820-7a9d-46ff-b479-df064d0c8c65", + "HTTPStatusCode": 200, + "HTTPHeaders": { + "date": "Thu, 03 Oct 2024 06:47:16 GMT", + "content-type": "application/x-amz-json-1.1", + "content-length": "1482", + "connection": "keep-alive", + "x-amzn-requestid": "e6d7f820-7a9d-46ff-b479-df064d0c8c65" + }, + "RetryAttempts": 0 + } +} diff --git a/tests/athena/expected.json b/tests/athena/expected.json new file mode 100644 index 00000000..ba1b42f7 --- /dev/null +++ b/tests/athena/expected.json @@ -0,0 +1,141 @@ +[ + { + "logicalId": { + "name": "awsdatacatalog.spectrum_db2.sales", + "platform": "ATHENA" + }, + "schema": { + "fields": [ + { + "fieldName": "salesid", + "fieldPath": "salesid", + "nativeType": "int" + }, + { + "fieldName": "listid", + "fieldPath": "listid", + "nativeType": "int" + }, + { + "fieldName": "sellerid", + "fieldPath": "sellerid", + "nativeType": "int" + }, + { + "fieldName": "buyerid", + "fieldPath": "buyerid", + "nativeType": "int" + }, + { + "fieldName": "eventid", + "fieldPath": "eventid", + "nativeType": "int" + }, + { + "fieldName": "dateid", + "fieldPath": "dateid", + "nativeType": "smallint" + }, + { + "fieldName": "qtysold", + "fieldPath": "qtysold", + "nativeType": "smallint" + }, + { + "fieldName": "pricepaid", + "fieldPath": "pricepaid", + "nativeType": "decimal(8,2)" + }, + { + "fieldName": "commission", + "fieldPath": "commission", + "nativeType": "decimal(8,2)" + }, + { + "fieldName": "saletime", + "fieldPath": "saletime", + "nativeType": "timestamp" + } + ], + "schemaType": "SQL", + "sqlSchema": { + "materialization": "TABLE" + } + }, + "sourceInfo": { + "createdAtSource": "2022-09-07T21:46:03+08:00" + }, + "structure": { + "database": "AwsDataCatalog", + "schema": "spectrum_db2", + "table": "sales" + } + }, + { + "logicalId": { + "name": "awsdatacatalog.spectrum_db2.sales_view", + "platform": "ATHENA" + }, + "schema": { + "fields": [ + { + "fieldName": "salesid", + "fieldPath": "salesid", + "nativeType": "int" + }, + { + "fieldName": "listid", + "fieldPath": "listid", + "nativeType": "int" + }, + { + "fieldName": "sellerid", + "fieldPath": "sellerid", + "nativeType": "int" + }, + { + "fieldName": "buyerid", + "fieldPath": "buyerid", + "nativeType": "int" + }, + { + "fieldName": "dateid", + "fieldPath": "dateid", + "nativeType": "smallint" + }, + { + "fieldName": "qtysold", + "fieldPath": "qtysold", + "nativeType": "smallint" + }, + { + "fieldName": "pricepaid", + "fieldPath": "pricepaid", + "nativeType": "decimal(8,2)" + }, + { + "fieldName": "commission", + "fieldPath": "commission", + "nativeType": "decimal(8,2)" + }, + { + "fieldName": "saletime", + "fieldPath": "saletime", + "nativeType": "timestamp" + } + ], + "schemaType": "SQL", + "sqlSchema": { + "materialization": "VIEW" + } + }, + "sourceInfo": { + "createdAtSource": "2024-10-03T00:50:38+08:00" + }, + "structure": { + "database": "AwsDataCatalog", + "schema": "spectrum_db2", + "table": "sales_view" + } + } +] diff --git a/tests/athena/expected_query_logs.json b/tests/athena/expected_query_logs.json new file mode 100644 index 00000000..4821ce00 --- /dev/null +++ b/tests/athena/expected_query_logs.json @@ -0,0 +1,60 @@ +[ + { + "duration": 1360, + "platform": "ATHENA", + "queryId": "a52ba71d-7cca-42e1-b4dc-db8c5c59e4ff", + "sources": [ + { + "database": "awsdatacatalog", + "id": "DATASET~107B2CFCE5331D515FDF1B587B0FE42F", + "schema": "spectrum_db2", + "table": "sales" + } + ], + "sql": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", + "sqlHash": "ab84f70dad2f22c7680b41d0dc8ca73d", + "startTime": "2024-10-03T00:50:12.646000+00:00", + "targets": [] + }, + { + "duration": 762, + "platform": "ATHENA", + "queryId": "8c944c49-ccc0-43b2-9dc9-e2428c76e8ab", + "sources": [ + { + "database": "awsdatacatalog", + "id": "DATASET~107B2CFCE5331D515FDF1B587B0FE42F", + "schema": "spectrum_db2", + "table": "sales" + } + ], + "sql": "SELECT * FROM \"spectrum_db2\".\"sales\" limit 10", + "sqlHash": "ab84f70dad2f22c7680b41d0dc8ca73d", + "startTime": "2024-10-02T16:48:48.443000+00:00", + "targets": [] + }, + { + "duration": 547, + "platform": "ATHENA", + "queryId": "a80c3d38-5a82-450c-a7f3-58bc476597d8", + "sources": [ + { + "database": "awsdatacatalog", + "id": "DATASET~107B2CFCE5331D515FDF1B587B0FE42F", + "schema": "spectrum_db2", + "table": "sales" + } + ], + "sql": "-- View Example\nCREATE OR REPLACE VIEW sales_view AS\nSELECT salesid, listid, sellerid, buyerid, dateid, qtysold, pricepaid, commission, saletime\nFROM sales\nWHERE commission > 10", + "sqlHash": "f89a26b30eb0e4fac013ff31f4ea900e", + "startTime": "2024-10-03T00:50:38.339000+00:00", + "targets": [ + { + "database": "awsdatacatalog", + "id": "DATASET~0B496130C48991D869F08AF85BE2F330", + "schema": "spectrum_db2", + "table": "sales_view" + } + ] + } +] diff --git a/tests/athena/test_extractor.py b/tests/athena/test_extractor.py new file mode 100644 index 00000000..1d927a96 --- /dev/null +++ b/tests/athena/test_extractor.py @@ -0,0 +1,92 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from metaphor.athena.config import AthenaRunConfig, AwsCredentials +from metaphor.athena.extractor import AthenaExtractor +from metaphor.common.base_config import OutputConfig +from metaphor.common.event_util import EventUtil +from tests.test_utils import load_json + + +def dummy_config(): + return AthenaRunConfig( + aws=AwsCredentials( + access_key_id="key", secret_access_key="secret", region_name="region" + ), + output=OutputConfig(), + ) + + +@patch("metaphor.athena.extractor.create_athena_client") +@pytest.mark.asyncio +async def test_extractor(mock_create_client: MagicMock, test_root_dir: str): + def mock_list_data_catalogs(): + yield load_json( + f"{test_root_dir}/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json" + ) + + def mock_list_databases(**_): + yield load_json( + f"{test_root_dir}/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json" + ) + + def mock_list_table_metadata(**_): + yield load_json( + f"{test_root_dir}/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json" + ) + + def mock_get_paginator(method: str): + if method == "list_data_catalogs": + mock_paginator = MagicMock() + mock_paginator.paginate = mock_list_data_catalogs + return mock_paginator + elif method == "list_databases": + mock_paginator = MagicMock() + mock_paginator.paginate = mock_list_databases + return mock_paginator + elif method == "list_table_metadata": + mock_paginator = MagicMock() + mock_paginator.paginate = mock_list_table_metadata + return mock_paginator + + mock_client = MagicMock() + mock_client.get_paginator = mock_get_paginator + mock_create_client.return_value = mock_client + + extractor = AthenaExtractor(dummy_config()) + events = [EventUtil.trim_event(e) for e in await extractor.extract()] + + assert events == load_json(f"{test_root_dir}/athena/expected.json") + + +@patch("metaphor.athena.extractor.create_athena_client") +@pytest.mark.asyncio +async def test_collect_query_logs(mock_create_client: MagicMock, test_root_dir: str): + def mock_list_query_executions(**_): + yield load_json( + f"{test_root_dir}/athena/data/list_query_executions_3ec07c89-d2ee-414a-8490-c0a63471bd47.json" + ) + + def mock_get_paginator(method: str): + if method == "list_query_executions": + mock_paginator = MagicMock() + mock_paginator.paginate = mock_list_query_executions + return mock_paginator + if method == "list_data_catalogs": + mock_paginator = MagicMock() + mock_paginator.paginate.return_value = [] + return mock_paginator + + mock_client = MagicMock() + mock_client.get_paginator = mock_get_paginator + mock_client.batch_get_query_execution.return_value = load_json( + f"{test_root_dir}/athena/data/batch_get_query_executions_ef77e890-ca4f-4e46-b4d3-ee90915237ae.json" + ) + mock_create_client.return_value = mock_client + + extractor = AthenaExtractor(dummy_config()) + assert list(await extractor.extract()) == [] + events = [EventUtil.trim_event(e) for e in extractor.collect_query_logs()] + + assert events == load_json(f"{test_root_dir}/athena/expected_query_logs.json")