From 62a41e8a710a54b8c619a26e7d368418c79f5a52 Mon Sep 17 00:00:00 2001 From: alyiwang Date: Tue, 22 Oct 2024 12:26:05 -0700 Subject: [PATCH 1/2] Redshift profiler missing tables [sc-29514] --- metaphor/postgresql/profile/extractor.py | 42 +++++++++++++++++------- pyproject.toml | 2 +- 2 files changed, 32 insertions(+), 12 deletions(-) diff --git a/metaphor/postgresql/profile/extractor.py b/metaphor/postgresql/profile/extractor.py index 50b6a7bd..7f58a52f 100644 --- a/metaphor/postgresql/profile/extractor.py +++ b/metaphor/postgresql/profile/extractor.py @@ -1,6 +1,6 @@ import asyncio import traceback -from typing import Collection, Iterable, List +from typing import Collection, List try: import asyncpg @@ -61,7 +61,11 @@ async def extract(self) -> Collection[ENTITY_TYPES]: await asyncio.gather(*coroutines) - return self._datasets.values() + return [ + dataset + for dataset in self._datasets.values() + if self._trim_fields_and_check_empty_dataset(dataset) + ] async def _profile_database(self, database: str) -> None: pool = await self._create_connection_pool() @@ -69,18 +73,28 @@ async def _profile_database(self, database: str) -> None: async with pool.acquire() as conn: await self._fetch_tables(conn, database) datasets = await self._fetch_columns(conn, database) - logger.info(f"Include {len(datasets)} tables from {database}") + logger.info(f"Include {len(datasets)} datasets from {database}") tasks = [ self._profile_dataset(pool, dataset) for dataset in datasets - if dataset.schema.sql_schema.materialization != MaterializationType.VIEW - or not self._include_views + if self._filter_dataset_type(dataset) ] await asyncio.gather(*tasks) await pool.close() - self._trim_fields(datasets) + def _filter_dataset_type(self, dataset: Dataset) -> bool: + """ + Filter out dataset types based on the config, not profile "External", "Stream" and "Snapshot" + """ + dataset_type = dataset.schema.sql_schema.materialization + if self._include_views: + return dataset_type in { + MaterializationType.TABLE, + MaterializationType.VIEW, + MaterializationType.MATERIALIZED_VIEW, + } + return dataset_type == MaterializationType.TABLE async def _profile_dataset(self, pool: asyncpg.Pool, dataset: Dataset) -> None: async with pool.acquire() as conn: @@ -224,8 +238,14 @@ def _init_dataset( ) @staticmethod - def _trim_fields(datasets: Iterable[Dataset]) -> None: - """Drop temporary fields""" - for dataset in datasets: - dataset.schema = None - dataset.statistics = None + def _trim_fields_and_check_empty_dataset(dataset: Dataset) -> bool: + """Drop temporary fields and check if the dataset field statistic is empty""" + if ( + not dataset.field_statistics + or not dataset.field_statistics.field_statistics + ): + return False + + dataset.schema = None + dataset.statistics = None + return True diff --git a/pyproject.toml b/pyproject.toml index 3f44349a..1638d7f0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.134" +version = "0.14.135" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] From 8637885f5c29c06c8e0f779ea982774edac092a6 Mon Sep 17 00:00:00 2001 From: alyiwang Date: Tue, 22 Oct 2024 13:27:03 -0700 Subject: [PATCH 2/2] add tests --- tests/postgresql/profile/test_extractor.py | 34 ++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/tests/postgresql/profile/test_extractor.py b/tests/postgresql/profile/test_extractor.py index b10fa9ff..6e37ccee 100644 --- a/tests/postgresql/profile/test_extractor.py +++ b/tests/postgresql/profile/test_extractor.py @@ -1,3 +1,4 @@ +from metaphor.common.base_config import OutputConfig from metaphor.common.column_statistics import ColumnStatistics from metaphor.common.sampling import SamplingConfig from metaphor.models.metadata_change_event import ( @@ -8,9 +9,12 @@ DatasetSchema, DatasetStatistics, FieldStatistics, + MaterializationType, SchemaField, + SQLSchema, ) from metaphor.postgresql.profile.extractor import PostgreSQLProfileExtractor +from metaphor.redshift.profile.config import RedshiftProfileRunConfig column_statistics = ColumnStatistics(unique_count=True, avg_value=True) @@ -23,12 +27,42 @@ def init_dataset(name: str, row_count) -> Dataset: dataset.schema = DatasetSchema() dataset.schema.fields = [] + dataset.schema.sql_schema = SQLSchema() dataset.statistics = DatasetStatistics() dataset.statistics.record_count = float(row_count) return dataset +def test_filter_dataset_type(): + table = init_dataset(name="1", row_count=1000) + table.schema.sql_schema.materialization = MaterializationType.TABLE + + view = init_dataset(name="2", row_count=1000) + view.schema.sql_schema.materialization = MaterializationType.VIEW + + external = init_dataset(name="3", row_count=1000) + external.schema.sql_schema.materialization = MaterializationType.EXTERNAL + + config = RedshiftProfileRunConfig( + host="", + database="", + user="", + password="", + output=OutputConfig(), + ) + extractor_filter_view = PostgreSQLProfileExtractor(config) + assert extractor_filter_view._filter_dataset_type(table) + assert not extractor_filter_view._filter_dataset_type(view) + assert not extractor_filter_view._filter_dataset_type(external) + + config.include_views = True + extractor_include_view = PostgreSQLProfileExtractor(config) + assert extractor_include_view._filter_dataset_type(table) + assert extractor_include_view._filter_dataset_type(view) + assert not extractor_include_view._filter_dataset_type(external) + + def test_build_profiling_query(): dataset = init_dataset(name="foo", row_count=1000) dataset.schema.fields = [