Skip to content

Commit

Permalink
feat(mssql): platform instance aspect for dataflow and datajob entiti…
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor authored and yoonhyejin committed Dec 23, 2024
1 parent 24ee398 commit 0516fb2
Show file tree
Hide file tree
Showing 4 changed files with 574 additions and 228 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Union

from datahub.emitter.mce_builder import make_data_flow_urn, make_data_job_urn
from datahub.emitter.mce_builder import (
make_data_flow_urn,
make_data_job_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
)
from datahub.metadata.schema_classes import (
DataFlowInfoClass,
DataJobInfoClass,
DataJobInputOutputClass,
DataPlatformInstanceClass,
)


Expand Down Expand Up @@ -204,6 +210,18 @@ def as_datajob_info_aspect(self) -> DataJobInfoClass:
status=self.status,
)

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.flow.platform_instance:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.entity.flow.orchestrator),
instance=make_dataplatform_instance_urn(
platform=self.entity.flow.orchestrator,
instance=self.entity.flow.platform_instance,
),
)
return None


@dataclass
class MSSQLDataFlow:
Expand Down Expand Up @@ -238,3 +256,14 @@ def as_dataflow_info_aspect(self) -> DataFlowInfoClass:
customProperties=self.flow_properties,
externalUrl=self.external_url,
)

@property
def as_maybe_platform_instance_aspect(self) -> Optional[DataPlatformInstanceClass]:
if self.entity.platform_instance:
return DataPlatformInstanceClass(
platform=make_data_platform_urn(self.entity.orchestrator),
instance=make_dataplatform_instance_urn(
self.entity.orchestrator, self.entity.platform_instance
),
)
return None
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,13 @@ def construct_job_workunits(
aspect=data_job.as_datajob_info_aspect,
).as_workunit()

data_platform_instance_aspect = data_job.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
aspect=data_platform_instance_aspect,
).as_workunit()

if include_lineage:
yield MetadataChangeProposalWrapper(
entityUrn=data_job.urn,
Expand All @@ -654,6 +661,13 @@ def construct_flow_workunits(
entityUrn=data_flow.urn,
aspect=data_flow.as_dataflow_info_aspect,
).as_workunit()

data_platform_instance_aspect = data_flow.as_maybe_platform_instance_aspect
if data_platform_instance_aspect:
yield MetadataChangeProposalWrapper(
entityUrn=data_flow.urn,
aspect=data_platform_instance_aspect,
).as_workunit()
# TODO: Add SubType when it appear

def get_inspectors(self) -> Iterable[Inspector]:
Expand Down
Loading

0 comments on commit 0516fb2

Please sign in to comment.