Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/powerbi): use dataset workspace id as key for parent container #8994

Merged
Merged
39 changes: 17 additions & 22 deletions metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(
self.__reporter = reporter
self.__dataplatform_instance_resolver = dataplatform_instance_resolver
self.processed_datasets: Set[powerbi_data_classes.PowerBIDataset] = set()
self.workspace_key: ContainerKey
self.workspace_key: Optional[ContainerKey] = None

@staticmethod
def urn_to_lowercase(value: str, flag: bool) -> str:
Expand Down Expand Up @@ -374,6 +374,9 @@ def to_datahub_dataset(
f"Mapping dataset={dataset.name}(id={dataset.id}) to datahub dataset"
)

if self.__config.extract_datasets_to_containers:
dataset_mcps.extend(self.generate_container_for_dataset(dataset))

for table in dataset.tables:
# Create a URN for dataset
ds_urn = builder.make_dataset_urn_with_platform_instance(
Expand Down Expand Up @@ -461,7 +464,6 @@ def to_datahub_dataset(

self.append_container_mcp(
dataset_mcps,
workspace,
ds_urn,
dataset,
)
Expand Down Expand Up @@ -572,7 +574,6 @@ def tile_custom_properties(tile: powerbi_data_classes.Tile) -> dict:

self.append_container_mcp(
result_mcps,
workspace,
chart_urn,
)

Expand Down Expand Up @@ -695,7 +696,6 @@ def chart_custom_properties(dashboard: powerbi_data_classes.Dashboard) -> dict:

self.append_container_mcp(
list_of_mcps,
workspace,
dashboard_urn,
)

Expand All @@ -711,20 +711,15 @@ def chart_custom_properties(dashboard: powerbi_data_classes.Dashboard) -> dict:
def append_container_mcp(
self,
list_of_mcps: List[MetadataChangeProposalWrapper],
workspace: powerbi_data_classes.Workspace,
entity_urn: str,
dataset: Optional[powerbi_data_classes.PowerBIDataset] = None,
) -> None:
if self.__config.extract_datasets_to_containers and isinstance(
dataset, powerbi_data_classes.PowerBIDataset
):
container_key = dataset.get_dataset_key(self.__config.platform_name)
elif self.__config.extract_workspaces_to_containers:
container_key = workspace.get_workspace_key(
platform_name=self.__config.platform_name,
platform_instance=self.__config.platform_instance,
workspace_id_as_urn_part=self.__config.workspace_id_as_urn_part,
)
elif self.__config.extract_workspaces_to_containers and self.workspace_key:
container_key = self.workspace_key
else:
return None

Expand All @@ -743,6 +738,7 @@ def generate_container_for_workspace(
) -> Iterable[MetadataWorkUnit]:
self.workspace_key = workspace.get_workspace_key(
platform_name=self.__config.platform_name,
platform_instance=self.__config.platform_instance,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was missing, which meant that if platform_instance was set, the actual container metadata had a different urn as the references which were supposed to point to the same container.

workspace_id_as_urn_part=self.__config.workspace_id_as_urn_part,
)
container_work_units = gen_containers(
Expand All @@ -754,15 +750,21 @@ def generate_container_for_workspace(

def generate_container_for_dataset(
self, dataset: powerbi_data_classes.PowerBIDataset
) -> Iterable[MetadataWorkUnit]:
) -> Iterable[MetadataChangeProposalWrapper]:
dataset_key = dataset.get_dataset_key(self.__config.platform_name)
container_work_units = gen_containers(
container_key=dataset_key,
name=dataset.name if dataset.name else dataset.id,
parent_container_key=self.workspace_key,
sub_types=[BIContainerSubTypes.POWERBI_DATASET],
)
return container_work_units

# The if statement here is just to satisfy mypy
return [
wu.metadata
for wu in container_work_units
if isinstance(wu.metadata, MetadataChangeProposalWrapper)
]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is kind of ugly, but I couldn't find any other implementation for gen_containers and decided that for this implementation it's easier to unwrap the metadata from the work unit rather than create another implementation for container creation.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine - we should probably make gen_containers return MCPs instead of workunits


def append_tag_mcp(
self,
Expand Down Expand Up @@ -965,7 +967,6 @@ def to_chart_mcps(

self.append_container_mcp(
list_of_mcps,
workspace,
chart_urn,
)

Expand Down Expand Up @@ -1086,7 +1087,6 @@ def report_to_dashboard(

self.append_container_mcp(
list_of_mcps,
workspace,
dashboard_urn,
)

Expand Down Expand Up @@ -1220,10 +1220,6 @@ def validate_dataset_type_mapping(self):
f"Dataset lineage would get ingested for data-platform = {self.source_config.dataset_type_mapping}"
)

def extract_datasets_as_containers(self):
for dataset in self.mapper.processed_datasets:
yield from self.mapper.generate_container_for_dataset(dataset)

def extract_independent_datasets(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
Expand All @@ -1238,6 +1234,8 @@ def extract_independent_datasets(
def get_workspace_workunit(
self, workspace: powerbi_data_classes.Workspace
) -> Iterable[MetadataWorkUnit]:
self.mapper.processed_datasets = set()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Work around the leakage of processed datasets over workspaces by setting the processed_datasets empty for every single workspace instance.


if self.source_config.extract_workspaces_to_containers:
workspace_workunits = self.mapper.generate_container_for_workspace(
workspace
Expand Down Expand Up @@ -1270,9 +1268,6 @@ def get_workspace_workunit(
):
yield work_unit

if self.source_config.extract_datasets_to_containers:
yield from self.extract_datasets_as_containers()

yield from self.extract_independent_datasets(workspace)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
Expand Down
Loading
Loading