Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
yoonhyejin committed Dec 23, 2024
1 parent 4f8c9bb commit 8e18be9
Show file tree
Hide file tree
Showing 2 changed files with 433 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
)
from datahub.metadata.schema_classes import (
AuditStampClass,
DataPlatformInstanceClass,
DataProcessInstanceRunEventClass,
DataProcessInstanceRunResultClass,
DataProcessRunStatusClass,
DataProcessTypeClass,
SubTypesClass,
ContainerClass,
)
from datahub.metadata.urns import DataPlatformInstanceUrn, DataPlatformUrn, ContainerUrn
from datahub.utilities.str_enum import StrEnum
from datahub.utilities.urns.data_flow_urn import DataFlowUrn
from datahub.utilities.urns.data_job_urn import DataJobUrn
from datahub.utilities.urns.data_process_instance_urn import DataProcessInstanceUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from datahub.emitter.mcp_builder import ContainerKey


class DataProcessInstanceKey(DatahubKey):
Expand Down Expand Up @@ -61,7 +66,9 @@ class DataProcessInstance:
orchestrator: str
cluster: Optional[str] = None
type: str = DataProcessTypeClass.BATCH_SCHEDULED
template_urn: Optional[Union[DataJobUrn, DataFlowUrn, DatasetUrn]] = None
template_urn: Optional[
Union[DataJobUrn, DataFlowUrn, DatasetUrn, ContainerUrn]
] = None
parent_instance: Optional[DataProcessInstanceUrn] = None
properties: Dict[str, str] = field(default_factory=dict)
url: Optional[str] = None
Expand All @@ -71,6 +78,10 @@ class DataProcessInstance:
_template_object: Optional[Union[DataJob, DataFlow]] = field(
init=False, default=None, repr=False
)
data_platform: Optional[str] = None
data_plaform_instance: Optional[str] = None
subtype: Optional[str] = None
container_urn: Optional[str] = None

def __post_init__(self):
self.urn = DataProcessInstanceUrn(
Expand All @@ -80,6 +91,36 @@ def __post_init__(self):
id=self.id,
).guid()
)
if self.data_platform is None:
self.data_platform = self.orchestrator

try:
# We first try to create from string assuming its an urn
self.data_platform = str(
DataPlatformUrn.create_from_string(self.data_platform)
)
except Exception:
# If it fails, we assume its an id
self.data_platform = str(DataPlatformUrn.create_from_id(self.data_platform))

if self.data_plaform_instance is None and self.cluster is not None:
self.data_plaform_instance = self.cluster

if self.data_plaform_instance is not None:
try:
# We first try to create from string assuming its an urn
self.data_plaform_instance = str(
DataPlatformInstanceUrn.create_from_string(
self.data_plaform_instance
)
)
except Exception:
# If it fails, we assume its an id
self.data_plaform_instance = str(
DataPlatformInstanceUrn(
platform=self.data_platform, instance=self.data_plaform_instance
)
)

def start_event_mcp(
self, start_timestamp_millis: int, attempt: Optional[int] = None
Expand Down Expand Up @@ -269,6 +310,29 @@ def generate_mcp(
)
yield mcp

assert self.data_platform

mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=DataPlatformInstanceClass(
platform=self.data_platform, instance=self.data_plaform_instance
),
)
yield mcp

if self.subtype:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn), aspect=SubTypesClass(typeNames=[self.subtype])
)
yield mcp

if self.container_urn:
mcp = MetadataChangeProposalWrapper(
entityUrn=str(self.urn),
aspect=ContainerClass(container=self.container_urn),
)
yield mcp

yield from self.generate_inlet_outlet_mcp(materialize_iolets=materialize_iolets)

@staticmethod
Expand Down Expand Up @@ -331,6 +395,31 @@ def from_datajob(
dpi.outlets = datajob.outlets
return dpi

@staticmethod
def from_container(
container_key: ContainerKey,
id: str,
) -> "DataProcessInstance":
"""
Generates DataProcessInstance from a Container
:param datajob: (DataJob) the datajob from generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:param clone_inlets: (bool) whether to clone datajob's inlets
:param clone_outlets: (bool) whether to clone datajob's outlets
:return: DataProcessInstance
"""
dpi: DataProcessInstance = DataProcessInstance(
id=id,
orchestrator=DataPlatformUrn.from_string(
container_key.platform
).platform_name,
template_urn=None,
container_urn=container_key.as_urn(),
)

return dpi

@staticmethod
def from_dataflow(dataflow: DataFlow, id: str) -> "DataProcessInstance":
"""
Expand Down
Loading

0 comments on commit 8e18be9

Please sign in to comment.