Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Adding config option to auto lowercase dataset urns #8928

Merged
merged 15 commits into from
Oct 12, 2023
16 changes: 16 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_lowercase_urns,
auto_materialize_referenced_tags,
auto_status_aspect,
auto_workunit_reporter,
Expand Down Expand Up @@ -192,7 +193,17 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run
)

auto_lowercase_urns: Optional[MetadataWorkUnitProcessor] = None
if (
self.ctx.pipeline_config
and self.ctx.pipeline_config.flags.auto_lowercase_urns
):
auto_lowercase_urns = self._get_auto_lowercase_urn_processor(
enabled=self.ctx.pipeline_config.flags.auto_lowercase_urns
)

return [
auto_lowercase_urns,
auto_status_aspect,
auto_materialize_referenced_tags,
browse_path_processor,
Expand Down Expand Up @@ -238,6 +249,11 @@ def get_report(self) -> SourceReport:
def close(self) -> None:
pass

def _get_auto_lowercase_urn_processor(
self, enabled: bool
) -> MetadataWorkUnitProcessor:
return partial(auto_lowercase_urns, enabled=enabled)

def _get_browse_path_processor(self, dry_run: bool) -> MetadataWorkUnitProcessor:
config = self.get_config()
platform = getattr(self, "platform", None) or getattr(config, "platform", None)
Expand Down
38 changes: 37 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/api/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from datahub.emitter.mce_builder import make_dataplatform_instance_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
BrowsePathEntryClass,
BrowsePathsClass,
Expand All @@ -33,8 +34,9 @@
)
from datahub.telemetry import telemetry
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.error import InvalidUrnError
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn import Urn, guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns

if TYPE_CHECKING:
Expand Down Expand Up @@ -173,6 +175,40 @@ def auto_materialize_referenced_tags(
).as_workunit()


def auto_lowercase_urns(
stream: Iterable[MetadataWorkUnit], enabled: bool = False
) -> Iterable[MetadataWorkUnit]:
"""Lowercase all dataset urns"""

if not enabled:
return stream

for wu in stream:
try:
urn = Urn.create_from_string(wu.get_urn())
if urn.get_type() == DatasetUrn.ENTITY_TYPE:
dataset_urn = DatasetUrn.create_from_string(str(urn))
lowercased_urn = DatasetUrn.create_from_ids(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this only fixes the urn, but won't fix lineage edges or things

Let's use the lowercase_dataset_urns helper method instead

def lowercase_dataset_urns(model: DictWrapper) -> None:

dataset_urn.get_data_platform_urn().get_platform_name(),
dataset_urn.get_dataset_name().lower(),
dataset_urn.get_env(),
)

if isinstance(wu.metadata, MetadataChangeEvent):
wu.metadata.proposedSnapshot.urn = str(lowercased_urn)
else:
wu.metadata.entityUrn = str(lowercased_urn)

wu.id = wu.id.replace(
dataset_urn.get_dataset_name(), lowercased_urn.get_dataset_name()
)
yield wu
else:
yield wu
except InvalidUrnError:
yield wu


def auto_browse_path_v2(
stream: Iterable[MetadataWorkUnit],
*,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class FlagsConfig(ConfigModel):
),
)

auto_lowercase_urns: bool = Field(
hsheth2 marked this conversation as resolved.
Show resolved Hide resolved
default=False,
description="Wether to lowercase dataset entity urns.",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: Wether -> whether

)


class PipelineConfig(ConfigModel):
# Once support for discriminated unions gets merged into Pydantic, we can
Expand Down
41 changes: 41 additions & 0 deletions metadata-ingestion/tests/unit/test_source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datahub.ingestion.api.source_helpers import (
auto_browse_path_v2,
auto_empty_dataset_usage_statistics,
auto_lowercase_urns,
auto_status_aspect,
auto_workunit,
)
Expand Down Expand Up @@ -275,6 +276,46 @@ def test_auto_browse_path_v2_legacy_browse_path(telemetry_ping_mock):
assert paths["platform,dataset-2,PROD)"] == _make_browse_path_entries(["something"])


def test_auto_lowercase_aspects():
mcws = [
MetadataChangeProposalWrapper(
entityUrn=make_dataset_urn(
"bigquery", "myProject.mySchema.myTable", "PROD"
),
aspect=models.DatasetKeyClass(
"bigquery", "myProject.mySchema.myTable", "PROD"
),
).as_workunit(),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
).as_workunit(),
]
expected = [
*list(
auto_workunit(
[
MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,myproject.myschema.mytable,PROD)",
aspect=models.DatasetKeyClass(
"bigquery", "myProject.mySchema.myTable", "PROD"
),
),
MetadataChangeProposalWrapper(
entityUrn="urn:li:container:008e111aa1d250dd52e0fd5d4b307b1a",
aspect=models.ContainerPropertiesClass(
name="test",
),
),
]
)
),
]
assert list(auto_lowercase_urns(mcws, True)) == expected


@patch("datahub.ingestion.api.source_helpers.telemetry.telemetry_instance.ping")
def test_auto_browse_path_v2_container_over_legacy_browse_path(telemetry_ping_mock):
structure = {"a": {"b": ["c"]}}
Expand Down