-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion): Adding config option to auto lowercase dataset urns #8928
Changes from 11 commits
6b2d222
70de905
2727ee7
8980153
241a676
461e8d8
c52ea07
e507f60
672c4e4
624f956
eb4346e
669ba6a
b731344
5c3c7f4
9052d94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,6 +29,7 @@ | |
from datahub.ingestion.api.report import Report | ||
from datahub.ingestion.api.source_helpers import ( | ||
auto_browse_path_v2, | ||
auto_lowercase_urns, | ||
auto_materialize_referenced_tags, | ||
auto_status_aspect, | ||
auto_workunit_reporter, | ||
|
@@ -192,7 +193,19 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: | |
self.ctx.pipeline_config.flags.generate_browse_path_v2_dry_run | ||
) | ||
|
||
auto_lowercase_dataset_urns: Optional[MetadataWorkUnitProcessor] = None | ||
if ( | ||
self.ctx.pipeline_config | ||
and self.ctx.pipeline_config.source | ||
and self.ctx.pipeline_config.source.config | ||
and hasattr( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that I'm looking at this again, it's actually kinda tricky - There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now, I handle both case |
||
self.ctx.pipeline_config.source.config, "convert_urns_to_lowercase" | ||
) | ||
and self.ctx.pipeline_config.source.config.convert_urns_to_lowercase | ||
hsheth2 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
): | ||
auto_lowercase_dataset_urns = auto_lowercase_urns | ||
return [ | ||
auto_lowercase_dataset_urns, | ||
auto_status_aspect, | ||
auto_materialize_referenced_tags, | ||
browse_path_processor, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,11 @@ | |
from avro.schema import Field, RecordSchema | ||
|
||
from datahub.emitter.mcp import MetadataChangeProposalWrapper | ||
from datahub.metadata.schema_classes import DictWrapper | ||
from datahub.metadata.schema_classes import ( | ||
DictWrapper, | ||
MetadataChangeEventClass, | ||
MetadataChangeProposalClass, | ||
) | ||
from datahub.utilities.urns.dataset_urn import DatasetUrn | ||
from datahub.utilities.urns.urn import Urn, guess_entity_type | ||
|
||
|
@@ -32,7 +36,7 @@ def list_urns_with_path( | |
|
||
if isinstance(model, MetadataChangeProposalWrapper): | ||
if model.entityUrn: | ||
urns.append((model.entityUrn, ["urn"])) | ||
urns.append((model.entityUrn, ["entityUrn"])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice catch |
||
if model.entityKeyAspect: | ||
urns.extend( | ||
_add_prefix_to_paths( | ||
|
@@ -83,7 +87,15 @@ def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[ | |
return [urn for urn, _ in list_urns_with_path(model)] | ||
|
||
|
||
def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: | ||
def transform_urns( | ||
model: Union[ | ||
DictWrapper, | ||
MetadataChangeEventClass, | ||
MetadataChangeProposalClass, | ||
MetadataChangeProposalWrapper, | ||
], | ||
func: Callable[[str], str], | ||
) -> None: | ||
""" | ||
Rewrites all URNs in the given object according to the given function. | ||
""" | ||
|
@@ -95,14 +107,18 @@ def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None: | |
|
||
|
||
def _modify_at_path( | ||
model: Union[DictWrapper, list], path: _Path, new_value: str | ||
model: Union[DictWrapper, MetadataChangeProposalWrapper, list], | ||
path: _Path, | ||
new_value: str, | ||
) -> None: | ||
assert len(path) > 0 | ||
|
||
if len(path) == 1: | ||
if isinstance(path[0], int): | ||
assert isinstance(model, list) | ||
model[path[0]] = new_value | ||
elif isinstance(model, MetadataChangeProposalWrapper): | ||
setattr(model, path[0], new_value) | ||
else: | ||
assert isinstance(model, DictWrapper) | ||
model._inner_dict[path[0]] = new_value | ||
|
@@ -120,7 +136,14 @@ def _lowercase_dataset_urn(dataset_urn: str) -> str: | |
return str(cur_urn) | ||
|
||
|
||
def lowercase_dataset_urns(model: DictWrapper) -> None: | ||
def lowercase_dataset_urns( | ||
model: Union[ | ||
DictWrapper, | ||
MetadataChangeEventClass, | ||
MetadataChangeProposalClass, | ||
MetadataChangeProposalWrapper, | ||
] | ||
) -> None: | ||
def modify_urn(urn: str) -> str: | ||
if guess_entity_type(urn) == "dataset": | ||
return _lowercase_dataset_urn(urn) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: This property isn't specific to just dataset urns, is it?
We should ideally have a consistent property across all sources that lower cases any urns it generates or references to ensure we have clean lineage across all sources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently, it is only lowercase dataset urns
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that a design decision? What is the reasoning behind it?