Skip to content

Commit

Permalink
fix(ingest/glue): Add additional checks and logging when specifying c…
Browse files Browse the repository at this point in the history
…atalog_id (#12168)
  • Loading branch information
asikowitz authored Dec 24, 2024
1 parent f4b33b5 commit 756b199
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 12 deletions.
14 changes: 12 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.api.source import MetadataWorkUnitProcessor
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
Expand Down Expand Up @@ -115,7 +116,6 @@

logger = logging.getLogger(__name__)


DEFAULT_PLATFORM = "glue"
VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"]

Expand Down Expand Up @@ -220,6 +220,7 @@ def platform_validator(cls, v: str) -> str:
class GlueSourceReport(StaleEntityRemovalSourceReport):
tables_scanned = 0
filtered: List[str] = dataclass_field(default_factory=list)
databases: EntityFilterReport = EntityFilterReport.field(type="database")

num_job_script_location_missing: int = 0
num_job_script_location_invalid: int = 0
Expand Down Expand Up @@ -668,6 +669,7 @@ def get_datajob_wu(self, node: Dict[str, Any], job_name: str) -> MetadataWorkUni
return MetadataWorkUnit(id=f'{job_name}-{node["Id"]}', mce=mce)

def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
logger.debug("Getting all databases")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetDatabases.html
paginator = self.glue_client.get_paginator("get_databases")

Expand All @@ -684,10 +686,18 @@ def get_all_databases(self) -> Iterable[Mapping[str, Any]]:
pattern += "[?!TargetDatabase]"

for database in paginator_response.search(pattern):
if self.source_config.database_pattern.allowed(database["Name"]):
if (not self.source_config.database_pattern.allowed(database["Name"])) or (
self.source_config.catalog_id
and database.get("CatalogId")
and database.get("CatalogId") != self.source_config.catalog_id
):
self.report.databases.dropped(database["Name"])
else:
self.report.databases.processed(database["Name"])
yield database

def get_tables_from_database(self, database: Mapping[str, Any]) -> Iterable[Dict]:
logger.debug(f"Getting tables from database {database['Name']}")
# see https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue/paginator/GetTables.html
paginator = self.glue_client.get_paginator("get_tables")
database_name = database["Name"]
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/tests/unit/glue/glue_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"CreateTime": "June 01, 2021 at 14:55:13"
},
"name": "empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:123412341234:database/empty-database",
"qualifiedName": "arn:aws:glue:us-west-2:000000000000:database/empty-database",
"env": "PROD"
}
}
Expand Down
43 changes: 39 additions & 4 deletions metadata-ingestion/tests/unit/glue/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
validate_all_providers_have_committed_successfully,
)
from tests.unit.glue.test_glue_source_stubs import (
databases_1,
databases_2,
empty_database,
flights_database,
get_bucket_tagging,
get_databases_delta_response,
get_databases_response,
Expand Down Expand Up @@ -64,6 +64,7 @@
tables_2,
tables_profiling_1,
target_database_tables,
test_database,
)

FROZEN_TIME = "2020-04-14 07:00:00"
Expand Down Expand Up @@ -310,6 +311,40 @@ def test_config_without_platform():
assert source.platform == "glue"


def test_get_databases_filters_by_catalog():
def format_databases(databases):
return set(d["Name"] for d in databases)

all_catalogs_source: GlueSource = GlueSource(
config=GlueSourceConfig(aws_region="us-west-2"),
ctx=PipelineContext(run_id="glue-source-test"),
)
with Stubber(all_catalogs_source.glue_client) as glue_stubber:
glue_stubber.add_response("get_databases", get_databases_response, {})

expected = [flights_database, test_database, empty_database]
actual = all_catalogs_source.get_all_databases()
assert format_databases(actual) == format_databases(expected)
assert all_catalogs_source.report.databases.dropped_entities.as_obj() == []

catalog_id = "123412341234"
single_catalog_source: GlueSource = GlueSource(
config=GlueSourceConfig(catalog_id=catalog_id, aws_region="us-west-2"),
ctx=PipelineContext(run_id="glue-source-test"),
)
with Stubber(single_catalog_source.glue_client) as glue_stubber:
glue_stubber.add_response(
"get_databases", get_databases_response, {"CatalogId": catalog_id}
)

expected = [flights_database, test_database]
actual = single_catalog_source.get_all_databases()
assert format_databases(actual) == format_databases(expected)
assert single_catalog_source.report.databases.dropped_entities.as_obj() == [
"empty-database"
]


@freeze_time(FROZEN_TIME)
def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
deleted_actor_golden_mcs = "{}/glue_deleted_actor_mces_golden.json".format(
Expand Down Expand Up @@ -357,8 +392,8 @@ def test_glue_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
tables_on_first_call = tables_1
tables_on_second_call = tables_2
mock_get_all_databases_and_tables.side_effect = [
(databases_1, tables_on_first_call),
(databases_2, tables_on_second_call),
([flights_database], tables_on_first_call),
([test_database], tables_on_second_call),
]

pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
Expand Down
8 changes: 5 additions & 3 deletions metadata-ingestion/tests/unit/glue/test_glue_source_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,14 @@
"Permissions": ["ALL"],
}
],
"CatalogId": "123412341234",
"CatalogId": "000000000000",
},
]
}
databases_1 = [{"Name": "flights-database", "CatalogId": "123412341234"}]
databases_2 = [{"Name": "test-database", "CatalogId": "123412341234"}]
flights_database = {"Name": "flights-database", "CatalogId": "123412341234"}
test_database = {"Name": "test-database", "CatalogId": "123412341234"}
empty_database = {"Name": "empty-database", "CatalogId": "000000000000"}

tables_1 = [
{
"Name": "avro",
Expand Down

0 comments on commit 756b199

Please sign in to comment.