Skip to content

Commit

Permalink
fix(ingest/powerbi): use dataset workspace id as key for parent conta…
Browse files Browse the repository at this point in the history
…iner (#8994)
  • Loading branch information
looppi authored Nov 10, 2023
1 parent d6cb106 commit 1077138
Show file tree
Hide file tree
Showing 4 changed files with 1,004 additions and 341 deletions.
42 changes: 16 additions & 26 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
#########################################################
import logging
from typing import Iterable, List, Optional, Set, Tuple, Union
from typing import Iterable, List, Optional, Tuple, Union

import datahub.emitter.mce_builder as builder
import datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes as powerbi_data_classes
Expand Down Expand Up @@ -110,8 +110,7 @@ def __init__(
self.__config = config
self.__reporter = reporter
self.__dataplatform_instance_resolver = dataplatform_instance_resolver
self.processed_datasets: Set[powerbi_data_classes.PowerBIDataset] = set()
self.workspace_key: ContainerKey
self.workspace_key: Optional[ContainerKey] = None

@staticmethod
def urn_to_lowercase(value: str, flag: bool) -> str:
Expand Down Expand Up @@ -374,6 +373,9 @@ def to_datahub_dataset(
f"Mapping dataset={dataset.name}(id={dataset.id}) to datahub dataset"
)

if self.__config.extract_datasets_to_containers:
dataset_mcps.extend(self.generate_container_for_dataset(dataset))

for table in dataset.tables:
# Create a URN for dataset
ds_urn = builder.make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -461,7 +463,6 @@ def to_datahub_dataset(

self.append_container_mcp(
dataset_mcps,
workspace,
ds_urn,
dataset,
)
Expand All @@ -473,8 +474,6 @@ def to_datahub_dataset(
dataset.tags,
)

self.processed_datasets.add(dataset)

return dataset_mcps

@staticmethod
Expand Down Expand Up @@ -572,7 +571,6 @@ def tile_custom_properties(tile: powerbi_data_classes.Tile) -> dict:

self.append_container_mcp(
result_mcps,
workspace,
chart_urn,
)

Expand Down Expand Up @@ -695,7 +693,6 @@ def chart_custom_properties(dashboard: powerbi_data_classes.Dashboard) -> dict:

self.append_container_mcp(
list_of_mcps,
workspace,
dashboard_urn,
)

Expand All @@ -711,20 +708,15 @@ def chart_custom_properties(dashboard: powerbi_data_classes.Dashboard) -> dict:
def append_container_mcp(
self,
list_of_mcps: List[MetadataChangeProposalWrapper],
workspace: powerbi_data_classes.Workspace,
entity_urn: str,
dataset: Optional[powerbi_data_classes.PowerBIDataset] = None,
) -> None:
if self.__config.extract_datasets_to_containers and isinstance(
dataset, powerbi_data_classes.PowerBIDataset
):
container_key = dataset.get_dataset_key(self.__config.platform_name)
elif self.__config.extract_workspaces_to_containers:
container_key = workspace.get_workspace_key(
platform_name=self.__config.platform_name,
platform_instance=self.__config.platform_instance,
workspace_id_as_urn_part=self.__config.workspace_id_as_urn_part,
)
elif self.__config.extract_workspaces_to_containers and self.workspace_key:
container_key = self.workspace_key
else:
return None

Expand All @@ -743,6 +735,7 @@ def generate_container_for_workspace(
) -> Iterable[MetadataWorkUnit]:
self.workspace_key = workspace.get_workspace_key(
platform_name=self.__config.platform_name,
platform_instance=self.__config.platform_instance,
workspace_id_as_urn_part=self.__config.workspace_id_as_urn_part,
)
container_work_units = gen_containers(
Expand All @@ -754,15 +747,21 @@ def generate_container_for_workspace(

def generate_container_for_dataset(
self, dataset: powerbi_data_classes.PowerBIDataset
) -> Iterable[MetadataWorkUnit]:
) -> Iterable[MetadataChangeProposalWrapper]:
dataset_key = dataset.get_dataset_key(self.__config.platform_name)
container_work_units = gen_containers(
container_key=dataset_key,
name=dataset.name if dataset.name else dataset.id,
parent_container_key=self.workspace_key,
sub_types=[BIContainerSubTypes.POWERBI_DATASET],
)
return container_work_units

# The if statement here is just to satisfy mypy
return [
wu.metadata
for wu in container_work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
]

def append_tag_mcp(
self,
Expand Down Expand Up @@ -965,7 +964,6 @@ def to_chart_mcps(

self.append_container_mcp(
list_of_mcps,
workspace,
chart_urn,
)

Expand Down Expand Up @@ -1086,7 +1084,6 @@ def report_to_dashboard(

self.append_container_mcp(
list_of_mcps,
workspace,
dashboard_urn,
)

Expand Down Expand Up @@ -1220,10 +1217,6 @@ def validate_dataset_type_mapping(self):
f"Dataset lineage would get ingested for data-platform = {self.source_config.dataset_type_mapping}"
)

def extract_datasets_as_containers(self):
for dataset in self.mapper.processed_datasets:
yield from self.mapper.generate_container_for_dataset(dataset)

def extract_independent_datasets(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
Expand Down Expand Up @@ -1270,9 +1263,6 @@ def get_workspace_workunit(
):
yield work_unit

if self.source_config.extract_datasets_to_containers:
yield from self.extract_datasets_as_containers()

yield from self.extract_independent_datasets(workspace)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
Expand Down
Loading

0 comments on commit 1077138

Please sign in to comment.