Skip to content

Commit

Permalink
feat(ingestion/looker): ingest explore tags into the DataHub (datahub…
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored May 23, 2024
1 parent 92780e6 commit e361d28
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
TagPropertiesClass,
TagSnapshotClass,
)
from datahub.metadata.urns import TagUrn
from datahub.utilities.lossy_collections import LossyList, LossySet
from datahub.utilities.url_util import remove_port_from_url

Expand Down Expand Up @@ -669,6 +670,7 @@ class LookerExplore:
joins: Optional[List[str]] = None
fields: Optional[List[ViewField]] = None # the fields exposed in this explore
source_file: Optional[str] = None
tags: List[str] = dataclasses_field(default_factory=list)

@validator("name")
def remove_quotes(cls, v):
Expand Down Expand Up @@ -770,6 +772,7 @@ def from_dict(
# This method is getting called from lookml_source's get_internal_workunits method
# & upstream_views_file_path is not in use in that code flow
upstream_views_file_path={},
tags=cast(List, dict.get("tags")) if dict.get("tags") is not None else [],
)

@classmethod # noqa: C901
Expand All @@ -786,7 +789,6 @@ def from_api( # noqa: C901
try:
explore = client.lookml_model_explore(model, explore_name)
views: Set[str] = set()

lkml_fields: List[
LookmlModelExploreField
] = explore_field_set_to_lkml_fields(explore)
Expand Down Expand Up @@ -956,6 +958,7 @@ def from_api( # noqa: C901
),
upstream_views_file_path=upstream_views_file_path,
source_file=explore.source_file,
tags=list(explore.tags) if explore.tags is not None else [],
)
except SDKError as e:
if "<title>Looker Not Found (404)</title>" in str(e):
Expand Down Expand Up @@ -1133,6 +1136,20 @@ def _to_metadata_events( # noqa: C901
mcp,
]

# Add tags
explore_tag_urns: List[TagAssociationClass] = []
for tag in self.tags:
tag_urn = TagUrn(tag)
explore_tag_urns.append(TagAssociationClass(tag_urn.urn()))
proposals.append(
MetadataChangeProposalWrapper(
entityUrn=tag_urn.urn(),
aspect=tag_urn.to_key_aspect(),
)
)
if explore_tag_urns:
dataset_snapshot.aspects.append(GlobalTagsClass(explore_tag_urns))

# If extracting embeds is enabled, produce an MCP for embed URL.
if extract_embed_urls:
embed_mcp = create_embed_mcp(
Expand Down
66 changes: 64 additions & 2 deletions metadata-ingestion/tests/integration/looker/test_looker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, cast
from typing import Any, Dict, List, Optional, Union, cast
from unittest import mock

import pytest
Expand All @@ -24,9 +24,12 @@
WriteQuery,
)

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.run.pipeline import Pipeline, PipelineInitError
from datahub.ingestion.source.looker import looker_common, looker_usage
from datahub.ingestion.source.looker.looker_common import LookerExplore
from datahub.ingestion.source.looker.looker_config import LookerCommonConfig
from datahub.ingestion.source.looker.looker_lib_wrapper import (
LookerAPI,
LookerAPIConfig,
Expand All @@ -37,6 +40,8 @@
UserViewField,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import GlobalTagsClass, MetadataChangeEventClass
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
get_current_checkpoint_from_pipeline,
Expand Down Expand Up @@ -481,7 +486,9 @@ def setup_mock_explore_unaliased_with_joins(mocked_client):


def setup_mock_explore(
mocked_client: Any, additional_lkml_fields: List[LookmlModelExploreField] = []
mocked_client: Any,
additional_lkml_fields: List[LookmlModelExploreField] = [],
**additional_explore_fields: Any,
) -> None:
mock_model = mock.MagicMock(project_name="lkml_samples")
mocked_client.lookml_model.return_value = mock_model
Expand All @@ -508,6 +515,7 @@ def setup_mock_explore(
dimensions=lkml_fields,
),
source_file="test_source_file.lkml",
**additional_explore_fields,
)


Expand Down Expand Up @@ -1058,3 +1066,57 @@ def test_upstream_cll(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
assert (
looker_explore.fields[2].upstream_fields[0] == "dataset_lineages.createdon"
)


@freeze_time(FROZEN_TIME)
def test_explore_tags(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
mocked_client = mock.MagicMock()

with mock.patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
mock_checkpoint.return_value = mock_datahub_graph

tags: List[str] = ["metrics", "all"]

mock_sdk.return_value = mocked_client
setup_mock_explore(
mocked_client,
tags=tags,
)

looker_explore: Optional[LookerExplore] = looker_common.LookerExplore.from_api(
model="fake",
explore_name="my_explore_name",
client=mocked_client,
reporter=mock.MagicMock(),
source_config=mock.MagicMock(),
)

assert looker_explore is not None
assert looker_explore.name == "my_explore_name"
assert looker_explore.tags == tags

mcps: Optional[
List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]
] = looker_explore._to_metadata_events(
config=LookerCommonConfig(),
reporter=SourceReport(),
base_url="fake",
extract_embed_urls=False,
)

expected_tag_urns: List[str] = ["urn:li:tag:metrics", "urn:li:tag:all"]

actual_tag_urns: List[str] = []
if mcps:
for mcp in mcps:
if isinstance(mcp, MetadataChangeEventClass):
for aspect in mcp.proposedSnapshot.aspects:
if isinstance(aspect, GlobalTagsClass):
actual_tag_urns = [
tag_association.tag for tag_association in aspect.tags
]

assert expected_tag_urns == actual_tag_urns

0 comments on commit e361d28

Please sign in to comment.