Skip to content

Commit

Permalink
add upstream postgres db whitelisting feature
Browse files Browse the repository at this point in the history
  • Loading branch information
danielfordfc committed Oct 23, 2023
1 parent f878a4d commit 9324085
Showing 1 changed file with 39 additions and 0 deletions.
39 changes: 39 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ class TableauConfig(
description="Comma separated list of platforms to not ingest upstream lineage for",
)

upstream_postgres_database_whitelist: Optional[str] = Field(
default="",
description="Comma separated list of postgres databases to include in upstream lineage",
)

# pre = True because we want to take some decision before pydantic initialize the configuration to default values
@root_validator(pre=True)
def projects_backward_compatibility(cls, values: Dict) -> Dict:
Expand Down Expand Up @@ -510,6 +515,22 @@ def __init__(
# return empty list if the config is not set
self.ignore_upstream_lineage_platforms = []

# if ignore upstream lineage platforms doesn't contain "postgres", then consult the upstream_postgres_database_whitelist
if "postgres" not in self.ignore_upstream_lineage_platforms:
if self.config.upstream_postgres_database_whitelist:
self.upstream_postgres_database_whitelist = [
x.strip()
for x in (
self.config.upstream_postgres_database_whitelist.split(",")
)
]
else:
# return empty list if the config is not set
self.upstream_postgres_database_whitelist = []
else:
self.upstream_postgres_database_whitelist = []


self._authenticate()

def close(self) -> None:
Expand Down Expand Up @@ -1020,6 +1041,24 @@ def get_upstream_tables(self, tables, datasource_name, browse_path, is_custom_sq
)
continue

# skip upstream tables if the source is postgres and the database is not whitelisted
if table.get(tableau_constant.CONNECTION_TYPE) == "postgres":
upstream_db = (
table[tableau_constant.DATABASE][tableau_constant.NAME]
if table.get(tableau_constant.DATABASE)
and table[tableau_constant.DATABASE].get(tableau_constant.NAME)
else ""
)
if (
upstream_db
and upstream_db not in self.upstream_postgres_database_whitelist
):
logger.debug(
f"Skipping upstream table {table[tableau_constant.ID]}, database {upstream_db} not whitelisted"
)
continue


schema = table.get(tableau_constant.SCHEMA) or ""
table_name = table.get(tableau_constant.NAME) or ""
full_name = table.get(tableau_constant.FULL_NAME) or ""
Expand Down

0 comments on commit 9324085

Please sign in to comment.