Skip to content

Commit

Permalink
BQ crawler filtering support as project [sc-29768]
Browse files Browse the repository at this point in the history
  • Loading branch information
alyiwang committed Nov 8, 2024
1 parent a980be2 commit 8b0dde9
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 32 deletions.
31 changes: 15 additions & 16 deletions metaphor/common/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,23 +200,22 @@ def include_schema(self, database: str, schema: str) -> bool:
schema_lower = schema.lower()

def covered_by_filter(database_filter: DatabaseFilter, partial: bool):
if database_lower not in database_filter:
return False

schema_filter = database_filter[database_lower]

# empty schema filter
if schema_filter is None or len(schema_filter) == 0:
return True

for schema_pattern, table_filter in schema_filter.items():
# got a match
if fnmatch(schema_lower, schema_pattern):
# fully covered
if table_filter is None or len(table_filter) == 0:
# check each database pattern
for pattern, schema_filter in database_filter.items():
if fnmatch(database_lower, pattern):
# empty schema filter, match any schema
if schema_filter is None or len(schema_filter) == 0:
return True
else:
return partial

# check each schema pattern
for schema_pattern, table_filter in schema_filter.items():
# got a match
if fnmatch(schema_lower, schema_pattern):
# fully covered
if table_filter is None or len(table_filter) == 0:
return True
else:
return partial

return False

Expand Down
1 change: 1 addition & 0 deletions metaphor/common/sink.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def write_events(self, events: List[MetadataChangeEvent]) -> bool:
]

if len(valid_records) == 0:
logger.info("No valid MCE records to write")
return False

return self._sink(valid_records)
Expand Down
11 changes: 6 additions & 5 deletions metaphor/postgresql/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,11 +385,12 @@ def from_config_file(config_file: str) -> "PostgreSQLExtractor":
async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info(f"Fetching metadata from postgreSQL host {self._host}")

databases = (
await self._fetch_databases()
if self._filter.includes is None
else list(self._filter.includes.keys())
)
databases = [
db
for db in (await self._fetch_databases())
if self._filter.include_database(db)
]
logger.info(f"Databases to include: {databases}")

for db in databases:
conn = await self._connect_database(db)
Expand Down
17 changes: 7 additions & 10 deletions metaphor/postgresql/profile/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,14 @@ def __init__(self, config: PostgreSQLProfileRunConfig):
async def extract(self) -> Collection[ENTITY_TYPES]:
logger.info(f"Fetching data profile from host {self._host}")

databases = (
await self._fetch_databases()
if self._filter.includes is None
else list(self._filter.includes.keys())
)

coroutines = [
self._profile_database(database)
for database in databases
if self._filter.include_database(database)
databases = [
db
for db in (await self._fetch_databases())
if self._filter.include_database(db)
]
logger.info(f"Databases to include: {databases}")

coroutines = [self._profile_database(database) for database in databases]

await asyncio.gather(*coroutines)

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "metaphor-connectors"
version = "0.14.151"
version = "0.14.152"
license = "Apache-2.0"
description = "A collection of Python-based 'connectors' that extract metadata from various sources to ingest into the Metaphor app."
authors = ["Metaphor <[email protected]>"]
Expand Down
17 changes: 17 additions & 0 deletions tests/common/test_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,19 @@ def test_include_schema_glob_patterns():
assert not filter.include_schema("db", "bar")
assert not filter.include_schema("db", "uhoh")

filter = DatasetFilter(
includes={"*": {"test*": None}},
)

assert filter.include_schema("foo", "test1")
assert not filter.include_schema("foo", "bar")

filter = DatasetFilter(
includes={"foo*": {"test*": None}}, excludes={"foo_bar": None}
)
assert filter.include_schema("foo_baz", "test1")
assert not filter.include_schema("foo_bar", "test1")


def test_merge():
f1 = DatasetFilter()
Expand Down Expand Up @@ -284,3 +297,7 @@ def test_include_database():
filter = DatasetFilter(includes={"foo*": None}, excludes={"foo_bar": None})
assert filter.include_database("foo_baz")
assert not filter.include_database("foo_bar")

filter = DatasetFilter(includes={"*": None}, excludes={"foo_bar": None})
assert filter.include_database("foo_baz")
assert not filter.include_database("foo_bar")

0 comments on commit 8b0dde9

Please sign in to comment.