Skip to content

Commit

Permalink
feat(ingest): add auto_materialize_referenced_tags helper (#7626)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Mar 20, 2023
1 parent a27f82c commit cbd8e14
Show file tree
Hide file tree
Showing 10 changed files with 762 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
from datahub.utilities.perf_timer import PerfTimer
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.source_helpers import (
auto_materialize_referenced_tags,
auto_stale_entity_removal,
auto_status_aspect,
auto_workunit_reporter,
Expand Down Expand Up @@ -551,12 +552,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
yield from self.generate_lineage(project.id)

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_workunit_reporter(
self.report,
auto_status_aspect(self.get_workunits_internal()),
),
return auto_materialize_referenced_tags(
auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_workunit_reporter(
self.report,
auto_status_aspect(self.get_workunits_internal()),
),
)
)

def _get_projects(self, conn: bigquery.Client) -> List[BigqueryProject]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
from datahub.specific.dataset import DatasetPatchBuilder
from datahub.utilities.mapping import Constants, OperationProcessor
from datahub.utilities.source_helpers import (
auto_materialize_referenced_tags,
auto_stale_entity_removal,
auto_status_aspect,
)
Expand Down Expand Up @@ -888,9 +889,11 @@ def load_nodes(self) -> Tuple[List[DBTNode], Dict[str, Optional[str]]]:
raise NotImplementedError()

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
return auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
return auto_materialize_referenced_tags(
auto_stale_entity_removal(
self.stale_entity_removal_handler,
auto_status_aspect(self.get_workunits_internal()),
)
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
Expand Down
38 changes: 38 additions & 0 deletions metadata-ingestion/src/datahub/utilities/source_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,18 @@
MetadataChangeEventClass,
MetadataChangeProposalClass,
StatusClass,
TagKeyClass,
)
from datahub.utilities.urns.tag_urn import TagUrn
from datahub.utilities.urns.urn import guess_entity_type
from datahub.utilities.urns.urn_iter import list_urns


def auto_workunit(
stream: Iterable[Union[MetadataChangeEventClass, MetadataChangeProposalWrapper]]
) -> Iterable[MetadataWorkUnit]:
"""Convert a stream of MCEs and MCPs to a stream of :class:`MetadataWorkUnit`s."""

for item in stream:
if isinstance(item, MetadataChangeEventClass):
yield MetadataWorkUnit(id=f"{item.proposedSnapshot.urn}/mce", mce=item)
Expand Down Expand Up @@ -109,3 +114,36 @@ def auto_workunit_reporter(
for wu in stream:
report.report_workunit(wu)
yield wu


def auto_materialize_referenced_tags(
stream: Iterable[MetadataWorkUnit],
active: bool = True,
) -> Iterable[MetadataWorkUnit]:
"""For all references to tags, emit a tag key aspect to ensure that the tag exists in our backend."""

if not active:
yield from stream
return

referenced_tags = set()
tags_with_aspects = set()

for wu in stream:
for urn in list_urns(wu.metadata):
if guess_entity_type(urn) == "tag":
referenced_tags.add(urn)

urn = wu.get_urn()
if guess_entity_type(urn) == "tag":
tags_with_aspects.add(urn)

yield wu

for urn in referenced_tags - tags_with_aspects:
tag_urn = TagUrn.create_from_string(urn)

yield MetadataChangeProposalWrapper(
entityUrn=urn,
aspect=TagKeyClass(name=tag_urn.get_entity_id()[0]),
).as_workunit()
51 changes: 44 additions & 7 deletions metadata-ingestion/src/datahub/utilities/urns/urn_iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,23 @@

from avro.schema import Field, RecordSchema

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.metadata.schema_classes import DictWrapper
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.utilities.urns.urn import Urn, guess_entity_type

_Path = List[Union[str, int]]


def list_urns_with_path(model: DictWrapper) -> List[Tuple[str, _Path]]:
def _add_prefix_to_paths(
prefix: _Path, items: List[Tuple[str, _Path]]
) -> List[Tuple[str, _Path]]:
return [(urn, [*prefix, *path]) for urn, path in items]


def list_urns_with_path(
model: Union[DictWrapper, MetadataChangeProposalWrapper]
) -> List[Tuple[str, _Path]]:
"""List urns in the given model with their paths.
Args:
Expand All @@ -19,10 +28,26 @@ def list_urns_with_path(model: DictWrapper) -> List[Tuple[str, _Path]]:
A list of tuples of the form (urn, path), where path is a list of keys.
"""

schema: RecordSchema = model.RECORD_SCHEMA

urns: List[Tuple[str, _Path]] = []

if isinstance(model, MetadataChangeProposalWrapper):
if model.entityUrn:
urns.append((model.entityUrn, ["urn"]))
if model.entityKeyAspect:
urns.extend(
_add_prefix_to_paths(
["entityKeyAspect"], list_urns_with_path(model.entityKeyAspect)
)
)
if model.aspect:
urns.extend(
_add_prefix_to_paths(["aspect"], list_urns_with_path(model.aspect))
)

return urns

schema: RecordSchema = model.RECORD_SCHEMA

for key, value in model.items():
if not value:
continue
Expand All @@ -31,13 +56,13 @@ def list_urns_with_path(model: DictWrapper) -> List[Tuple[str, _Path]]:
is_urn = field_schema.get_prop("Urn") is not None

if isinstance(value, DictWrapper):
for urn, path in list_urns_with_path(value):
urns.append((urn, [key, *path]))
urns.extend(_add_prefix_to_paths([key], list_urns_with_path(value)))
elif isinstance(value, list):
for i, item in enumerate(value):
if isinstance(item, DictWrapper):
for urn, path in list_urns_with_path(item):
urns.append((urn, [key, i, *path]))
urns.extend(
_add_prefix_to_paths([key, i], list_urns_with_path(item))
)
elif is_urn:
urns.append((item, [key, i]))
elif is_urn:
Expand All @@ -46,6 +71,18 @@ def list_urns_with_path(model: DictWrapper) -> List[Tuple[str, _Path]]:
return urns


def list_urns(model: Union[DictWrapper, MetadataChangeProposalWrapper]) -> List[str]:
"""List urns in the given model.
Args:
model: The model to list urns from.
Returns: A list of URNs contained in the given model.
"""

return [urn for urn, _ in list_urns_with_path(model)]


def transform_urns(model: DictWrapper, func: Callable[[str], str]) -> None:
"""
Rewrites all URNs in the given object according to the given function.
Expand Down
Loading

0 comments on commit cbd8e14

Please sign in to comment.