From 8b0dde9667f013947a912977b575d62b203fb379 Mon Sep 17 00:00:00 2001 From: alyiwang Date: Fri, 8 Nov 2024 11:26:10 -0800 Subject: [PATCH] BQ crawler filtering support as project [sc-29768] --- metaphor/common/filter.py | 31 ++++++++++++------------ metaphor/common/sink.py | 1 + metaphor/postgresql/extractor.py | 11 +++++---- metaphor/postgresql/profile/extractor.py | 17 ++++++------- pyproject.toml | 2 +- tests/common/test_filter.py | 17 +++++++++++++ 6 files changed, 47 insertions(+), 32 deletions(-) diff --git a/metaphor/common/filter.py b/metaphor/common/filter.py index 32376700..7f583183 100644 --- a/metaphor/common/filter.py +++ b/metaphor/common/filter.py @@ -200,23 +200,22 @@ def include_schema(self, database: str, schema: str) -> bool: schema_lower = schema.lower() def covered_by_filter(database_filter: DatabaseFilter, partial: bool): - if database_lower not in database_filter: - return False - - schema_filter = database_filter[database_lower] - - # empty schema filter - if schema_filter is None or len(schema_filter) == 0: - return True - - for schema_pattern, table_filter in schema_filter.items(): - # got a match - if fnmatch(schema_lower, schema_pattern): - # fully covered - if table_filter is None or len(table_filter) == 0: + # check each database pattern + for pattern, schema_filter in database_filter.items(): + if fnmatch(database_lower, pattern): + # empty schema filter, match any schema + if schema_filter is None or len(schema_filter) == 0: return True - else: - return partial + + # check each schema pattern + for schema_pattern, table_filter in schema_filter.items(): + # got a match + if fnmatch(schema_lower, schema_pattern): + # fully covered + if table_filter is None or len(table_filter) == 0: + return True + else: + return partial return False diff --git a/metaphor/common/sink.py b/metaphor/common/sink.py index ee7c8d24..519fb998 100644 --- a/metaphor/common/sink.py +++ b/metaphor/common/sink.py @@ -28,6 +28,7 @@ def write_events(self, events: List[MetadataChangeEvent]) -> bool: ] if len(valid_records) == 0: + logger.info("No valid MCE records to write") return False return self._sink(valid_records) diff --git a/metaphor/postgresql/extractor.py b/metaphor/postgresql/extractor.py index f4f8b879..63781e53 100644 --- a/metaphor/postgresql/extractor.py +++ b/metaphor/postgresql/extractor.py @@ -385,11 +385,12 @@ def from_config_file(config_file: str) -> "PostgreSQLExtractor": async def extract(self) -> Collection[ENTITY_TYPES]: logger.info(f"Fetching metadata from postgreSQL host {self._host}") - databases = ( - await self._fetch_databases() - if self._filter.includes is None - else list(self._filter.includes.keys()) - ) + databases = [ + db + for db in (await self._fetch_databases()) + if self._filter.include_database(db) + ] + logger.info(f"Databases to include: {databases}") for db in databases: conn = await self._connect_database(db) diff --git a/metaphor/postgresql/profile/extractor.py b/metaphor/postgresql/profile/extractor.py index 7f58a52f..eabf89ad 100644 --- a/metaphor/postgresql/profile/extractor.py +++ b/metaphor/postgresql/profile/extractor.py @@ -47,17 +47,14 @@ def __init__(self, config: PostgreSQLProfileRunConfig): async def extract(self) -> Collection[ENTITY_TYPES]: logger.info(f"Fetching data profile from host {self._host}") - databases = ( - await self._fetch_databases() - if self._filter.includes is None - else list(self._filter.includes.keys()) - ) - - coroutines = [ - self._profile_database(database) - for database in databases - if self._filter.include_database(database) + databases = [ + db + for db in (await self._fetch_databases()) + if self._filter.include_database(db) ] + logger.info(f"Databases to include: {databases}") + + coroutines = [self._profile_database(database) for database in databases] await asyncio.gather(*coroutines) diff --git a/pyproject.toml b/pyproject.toml index 76b80906..e9a5ad29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "metaphor-connectors" -version = "0.14.151" +version = "0.14.152" license = "Apache-2.0" description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app." authors = ["Metaphor "] diff --git a/tests/common/test_filter.py b/tests/common/test_filter.py index 88be27a1..38b4c0e9 100644 --- a/tests/common/test_filter.py +++ b/tests/common/test_filter.py @@ -165,6 +165,19 @@ def test_include_schema_glob_patterns(): assert not filter.include_schema("db", "bar") assert not filter.include_schema("db", "uhoh") + filter = DatasetFilter( + includes={"*": {"test*": None}}, + ) + + assert filter.include_schema("foo", "test1") + assert not filter.include_schema("foo", "bar") + + filter = DatasetFilter( + includes={"foo*": {"test*": None}}, excludes={"foo_bar": None} + ) + assert filter.include_schema("foo_baz", "test1") + assert not filter.include_schema("foo_bar", "test1") + def test_merge(): f1 = DatasetFilter() @@ -284,3 +297,7 @@ def test_include_database(): filter = DatasetFilter(includes={"foo*": None}, excludes={"foo_bar": None}) assert filter.include_database("foo_baz") assert not filter.include_database("foo_bar") + + filter = DatasetFilter(includes={"*": None}, excludes={"foo_bar": None}) + assert filter.include_database("foo_baz") + assert not filter.include_database("foo_bar")