diff --git a/metaphor/athena/extractor.py b/metaphor/athena/extractor.py index 760a153d..e7869e71 100644 --- a/metaphor/athena/extractor.py +++ b/metaphor/athena/extractor.py @@ -55,6 +55,7 @@ def __init__(self, config: AthenaRunConfig) -> None: super().__init__(config) self._datasets: Dict[str, Dataset] = {} self._aws_config = config.aws + self._filter = config.filter.normalize() async def extract(self) -> Collection[ENTITY_TYPES]: logger.info("Fetching metadata from Athena") @@ -62,8 +63,16 @@ async def extract(self) -> Collection[ENTITY_TYPES]: self._client = create_athena_client(self._aws_config) for catalog in self._get_catalogs(): + if not self._filter.include_database(database_name=catalog): + logger.info(f"Skipping catalog: {catalog}") + continue + databases = self._get_databases(catalog) for database in databases: + if not self._filter.include_schema(database=catalog, schema=database): + logger.info(f"Skipping database: {catalog}.{database}") + continue + self._extract_tables(catalog, database) return self._datasets.values() @@ -110,9 +119,13 @@ def _paginate_and_dump_response(self, api_endpoint: str, **paginator_args): yield page def _init_dataset(self, catalog: str, database: str, table_metadata: TableMetadata): - name = dataset_normalized_name( - db=catalog, schema=database, table=table_metadata.Name - ) + table = table_metadata.Name + + if not self._filter.include_table(catalog, database, table): + logger.info(f"Skipping table: {catalog}.{database}.{table}") + return + + name = dataset_normalized_name(db=catalog, schema=database, table=table) table_type = ( TableTypeEnum(table_metadata.TableType) diff --git a/pyproject.toml b/pyproject.toml index 1134cbdc..55d1806e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.125" +version = "0.14.126" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json b/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json index 50a93f0d..10e8173f 100644 --- a/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json +++ b/tests/athena/data/list_data_catalogs_f8346904-fad9-4b57-a4ca-4a5df6392d62.json @@ -3,6 +3,10 @@ { "CatalogName": "AwsDataCatalog", "Type": "GLUE" + }, + { + "CatalogName": "Test", + "Type": "GLUE" } ], "ResponseMetadata": { diff --git a/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json b/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json index def8831c..2050fef8 100644 --- a/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json +++ b/tests/athena/data/list_databases_3039188f-6f71-4c41-b79c-cac7bcb905bc.json @@ -2,6 +2,9 @@ "DatabaseList": [ { "Name": "spectrum_db2" + }, + { + "Name": "Foo" } ], "ResponseMetadata": { diff --git a/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json b/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json index 6bb41415..08108a8d 100644 --- a/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json +++ b/tests/athena/data/list_table_metadata_e6d7f820-7a9d-46ff-b479-df064d0c8c65.json @@ -110,6 +110,30 @@ "presto_view": "true", "serde.serialization.lib": null } + }, + { + "Name": "table", + "CreateTime": "2022-09-07 21:46:03+08:00", + "LastAccessTime": "1970-01-01 08:00:00+08:00", + "TableType": "EXTERNAL_TABLE", + "Columns": [ + { + "Name": "salesid", + "Type": "int" + } + ], + "PartitionKeys": [], + "Parameters": { + "EXTERNAL": "TRUE", + "inputformat": "org.apache.hadoop.mapred.TextInputFormat", + "location": "s3://metaphor-specturm/example", + "numRows": "172000", + "outputformat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat", + "serde.param.field.delim": "\t", + "serde.param.serialization.format": "\t", + "serde.serialization.lib": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "transient_lastDdlTime": "1662558363" + } } ], "ResponseMetadata": { diff --git a/tests/athena/test_extractor.py b/tests/athena/test_extractor.py index 1d927a96..9f49a107 100644 --- a/tests/athena/test_extractor.py +++ b/tests/athena/test_extractor.py @@ -6,6 +6,7 @@ from metaphor.athena.extractor import AthenaExtractor from metaphor.common.base_config import OutputConfig from metaphor.common.event_util import EventUtil +from metaphor.common.filter import DatasetFilter from tests.test_utils import load_json @@ -14,6 +15,12 @@ def dummy_config(): aws=AwsCredentials( access_key_id="key", secret_access_key="secret", region_name="region" ), + filter=DatasetFilter( + excludes={ + "test": None, + "awsdatacatalog": {"foo": None, "spectrum_db2": set(["table"])}, + } + ), output=OutputConfig(), )